Datax异构数据源数据同步

梁铭图 2019-09-19 11:30:00
 

​作者介绍

梁铭图,新炬网络首席架构师,十多年数据库运维、数据库设计、数据治理以及系统规划建设经验,拥有Oracle OCM、Togaf企业架构师(鉴定级)、IBM CATE等认证,曾获dbaplus年度MVP以及华为云MVP等荣誉,并参与数据资产管理国家标准的编写工作。在数据库运维管理和架构设计、运维体系规划、数据资产管理方面有深入研究。

 

作为数据库管理人员,有时不得不面对将大量数据在数据库之间同步的问题,数据库类型、时间窗口、成本、数据内容特点等诸多因素都是需要认真考虑的。本次的案例中,就涉及到一个从DB2到Oracle的数据库之间的数据迁移。在这个迁移过程中,由于种种原因GoldenGate,生成数据文件、ETL工具等方式由于各种限制先后被排除,最终我们采用了DataX工具完成本次迁移。

 

下文是DataX实现DB2到Oracle的数据库同步的测试过程。

 

一、什么是DataX

 

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、PostgreSQL、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。



 

GitHub提供了目前支持的数据源列表:

 

二、DataX核心架构

 

1、DataX核心组件
 

 


 

DataX核心组件主要分为

 

Reader:数据采集模块,负责从源采集数据。

 

Writer:数据写入模块,负责写入目标库。

 

Framework:数据传输通道,负责处理数据缓冲等。

 

2、DataX核心机制
 

 

 

DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。 

 

DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。 

 

切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。 

 

每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。 

 

DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。

 

三、DataX测试配置

 

1、OB全局配置
 

 

JOB通过配置对同步整个过程进行管控。

 

 

    "job": {

        "setting": {

            "speed": {

                "byte": 1048576

            },

            "errorLimit": {

                "record": 0,

                "percentage": 0.02

            }

        },

 

speed为同步速度限制参数,这里有三个参数channel、record和byte。

 

channel:管道数,可以理解为并行数,需与splitPk一同使用,否则无效果。

 

record:每次同步多少条数据,取record和byte中的最小值。

 

byte:每次同步多少字节数据,取record和byte中的最小值。

 

errorLimit为错误数据限制,这里有两个参数record和percentage,指当异常数据达到多少时同步取消,取record和percentage的最小值。

 

2、Reader线程配置
 

 

RDBMS Reader通过JDBC连接器连接到远程的RDBMS数据库,并根据用户配置的信息生成查询SELECT SQL语句并发送到远程RDBMS数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。

 

对于用户配置Table、Column、Where的信息,RDBMS Reader将其拼接为SQL语句发送到数据库;对于用户配置querySql信息,RDBMS直接将其发送到数据库。

 

这里配置一个从RDBMS数据库同步抽取DB2数据作业样例:

 

        "content": [

            {

                "reader": {

                    "name": "rdbmsreader",

                    "parameter": {

                        "username": "xxx",

                        "password": "xxx",

                        "column": [

                            "id",

                            "name"

                          “content”

                        ],

                        "splitPk": "pk",

                        "connection": [

                            {

                                "table": [

                                    "testtable"

                                ],

                                "jdbcUrl": [

                                    "jdbc:dm://xxx.xxx.xxx.xxx:50000/database"

                                ]

                            }

                        ],

                        "fetchSize": 1024,

                    }

                },

 

主要配置修改内容:

 

Username:源数据库的用户名。

 

Password:源数据库的密码。

 

Encoding:数据库字符集,GBK,UTF8等。

 

Column:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用代表默认使用所有列配置,例如['']。

 

支持列裁剪,即列可以挑选部分列进行导出。

 

支持列换序,即列可以不按照表schema信息进行导出。

 

支持常量配置,用户需要按照JSON格式: ["id", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"] id为普通列名,1为整形数字常量,'bazhen.csy'为字符串常量,null为空指针,to_char(a + 1)为表达式,2.3为浮点数,true为布尔值。

 

Column必须显示填写,不允许为空!

 

splitPk:RDBMS Reader进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。

 

推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。

 

目前splitPk仅支持整形数据切分,不支持浮点、字符串型、日期等其他类型。如果用户指定其他非支持类型,RDBMSReader将报错!

 

注意:这里并非只能是主键,拥有唯一约束的列也可。

 

Table:所选取的需要同步的表名。

 

jdbcUrl:描述的是到对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息。请注意不同的数据库jdbc的格式是不同的,DataX会根据具体jdbc的格式选择合适的数据库驱动完成数据读取。

 

db2格式 jdbc:db2://ip:port/database

 

fetchSize:该配置项定义了插件和数据库服务器端每次批量数据获取条数,该值决定了DataX和服务器端的网络交互次数,能够较大的提升数据抽取性能。

 

注意:该值最大建议值为2048。

 

Where:筛选条件,RDBMSReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。例如在做测试时,可以将where条件指定为limit 10;在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > $bizdate 。

 

3、Writer线程配置
 

 

Oracle Writer 插件实现了写入数据到 Oracle 主库的目的表的功能。在底层实现上, OracleWriter 通过 JDBC 连接远程 Oracle 数据库,并执行相应的 insert into ... sql 语句将数据写入 Oracle,内部会分批次提交入库。

 

OracleWriter 面向ETL开发工程师,他们使用 OracleWriter 从数仓导入数据到 Oracle。同时 OracleWriter 亦可以作为数据迁移工具为DBA等用户提供服务。

 

OracleWriter 通过 DataX 框架获取 Reader 生成的协议数据,根据你配置生成相应的SQL语句insert into...(当主键/唯一性索引冲突时会写不进去冲突的行) 。

 

对于使用datax同步到oracle的表,建议删除主键和所有索引,数据校验完成后再进行重建。这样做会极大提升数据同步的速度。

 

                "writer": {

                    "name": "oraclewriter",

                    "parameter": {

                        "username": "root",

                        "password": "root",

                        "column": [

                            "id",

                            "name"

                        ],

                        "preSql": [

                            "delete from test"

                        ],

                        "connection": [

                            {

                                "jdbcUrl": "jdbc:oracle:thin:@[HOST_NAME]:PORT:[DATABASE_NAME]",

                                "table": [

                                    "test"

                                ]

                            }

                        ]

                    }

 

主要配置内容

 

Username:目的数据库的用户名。

 

Password:目的数据库的密码。

 

Encoding:数据库字符集。

 

batchSize:一次性批量提交的记录数大小,该值可以极大减少DataX与Oracle的网络交互次数,并提升整体吞吐量。最大只能设置为1024。

 

column:表需要写入数据的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。如果要依次写入全部列,使用表示, 例如: "column": [""]。

 

preSql:执行语句时会先检查是否存在若存在,根据条件删除。

 

jdbcUrl:目的数据库的 JDBC 连接信息。

 

table:目的表的表名称。支持写入一个或者多个表。当配置为多张表时,必须确保所有表结构保持一致。

 

同步测试记录
 

 

数据转换最高速度一条管道13000rec/s+,通过where手工对部分字段拆分进程进行同步,同步536250880条记录大约需要20分钟(容量300GB)的时间,基本满足数据迁移的要求。

 

四、小结

 

DataX是开源领域异构数据库之间数据同步的利器,从我们的测试来说他适用于以下特点:

 

它可简单配置就以实现异构数据库之间的数据迁移,表之间的结构相同,但中间不会做数据转换。

 

自动多任务的数据同步,加快数据同步速度。

 

可以实现多种数据源的批量同步和增量同步。

 

有开发能力的童鞋可以自己开发reader和writer接口,方便接入更多的数据源。

最新评论
访客 2023年08月20日

230721

访客 2023年08月16日

1、导入Mongo Monitor监控工具表结构(mongo_monitor…

访客 2023年08月04日

上面提到: 在问题描述的架构图中我们可以看到,Click…

访客 2023年07月19日

PMM不香吗?

访客 2023年06月20日

如今看都很棒

活动预告