介绍

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。 DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

详细资料和源代码请看:Github

我们使用的某一个场景:使用DataX把数据从Oracle导入到Phoenix中,10万条数据120多个字段耗时20s。

Hadoop集群环境配置:5台 12核50G

安装

这里使用的源码编辑的方式安装:下载DataX源码,自己编译:DataX源码

注释:[DataX_source_code_home] 源代码根目录

(1) 环境确认:如果满足直接跳过到第二步

(2) 下载DataX源码:

1
2
$ git clone https://github.com/alibaba/DataX.git

(3) 通过maven打包:

1
2
$ cd  {DataX_source_code_home}
$ mvn -U clean package assembly:assembly -Dmaven.test.skip=true

打包成功,日志显示如下:

1
2
3
4
5
6
[INFO] BUILD SUCCESS
[INFO] -----------------------------------------------------------------
[INFO] Total time: 08:12 min
[INFO] Finished at: 2015-12-13T16:26:48+08:00
[INFO] Final Memory: 133M/960M
[INFO] -----------------------------------------------------------------

打包成功后的DataX包位于 {DataX_source_code_home}/target/datax/datax/ ,结构如下:

1
2
3
$ cd  {DataX_source_code_home}
$ ls ./target/datax/datax/
bin		conf		job		lib		log		log_perf	plugin		script		tmp

至此安装成功

配置job

  • 在根目录中找到job文件夹,会发现有一个job.json的默认文件
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
{
    "job": {
        "setting": {
            "speed": {
                "byte":10485760
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column" : [
                            {
                                "value": "DataX",
                                "type": "string"
                            },
                            {
                                "value": 19890604,
                                "type": "long"
                            },
                            {
                                "value": "1989-06-04 00:00:00",
                                "type": "date"
                            },
                            {
                                "value": true,
                                "type": "bool"
                            },
                            {
                                "value": "test",
                                "type": "bytes"
                            }
                        ],
                        "sliceRecordCount": 100000
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false,
                        "encoding": "UTF-8"
                    }
                }
            }
        ]
    }
}
  • 返回根目录然后执行下面的命令:
1
./bin/datax.py job/job.json

是一个自带的例子,可以理解为用来学习和测试,同时也验证安装是否正常 执行结果下图,说明成功了。

1
2
3
4
5
6
7
8
2019-11-01 04:55:02.557 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2019-11-01 04:54:52
任务结束时刻                    : 2019-11-01 04:55:02
任务总计耗时                    :                 10s
任务平均流量                    :          253.91KB/s
记录写入速度                    :          10000rec/s
读出记录总数                    :              100000
读写失败总数                    :                   0
  • 配置自己的job文件,这里的场景是从Oracle数据库中读数据到Phoenix的表中。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
{
    "job": {
        "setting": {
            "speed": {
                "channel": 16,
                "byte": 1048576,
                "record": 10000
            }
        },
        "content": [{
            "reader": {
                "name": "oraclereader",	
				//oracle的读插件其他的可以参考其他组件文档,可以在源码里面找对应的名称
                "parameter": {
                    "username": "system",	//用户名
                    "password": "Password_123",	//密码
                    "column": ["*"], //需要同步的列
                    "splitPk": "YBZJ",//分片的健,减少数据热点
                    "connection": [{
                        "table": ["XXZYGX.VW_ZYGX_TL_QZTLGJXX"],//需要同步的表名称
                        "jdbcUrl": ["jdbc:oracle:thin:@172.26.1.140:1521:orcl"]
						//描述的是到对端数据库的JDBC连接信息
                    }]
                }
            },
            "writer": {
                "name": "hbase20xsqlwriter",//连接Phoenix的插件名称
                "parameter": {
                    "batchSize": "100",//批量写入的最大行数
                    "column": ["ZJHM" ,"ZJLX" ,"XM" ,"MZ" ],//需要同步的列
                    "queryServerAddress": "http://10.10.1.85:8765",
					//Phoenix QueryServer地址,必填项,同时Phoenix的queryServere也必须开启
                    "nullMode": "skip",
					//读取到的列值为null时,如何处理.
					//skip:跳过这一列,即不插入这一列(如果该行的这一列之前已经存在,则会被删除)
					//empty:插入空值,值类型的空值是0,varchar的空值是空字符串
                    "table": "VW_ZYGX_TL_QZTLGJXX",//需要写入的表名称
                    "schema":"QH"//表所在的schema
                }
            }
        }]
    }
}

其他的数据库的相关配置读写,参考官方的路径其他的配置介绍 在每个插件的目录中的doc文件里面查看文档

FAQ

  • 打包一定报错、需要配置一下maven与pom.xml,才能打包

1.maven配置阿里云的maven私服

1
2
3
4
5
6
<mirror>
	<id>nexus-aliyun</id>
	<mirrorOf>central</mirrorOf>
	<name>Nexus aliyun</name>
	<url>https://maven.aliyun.com/repository/central</url>
 </mirror>

2.本地编译报错无法找到工件com.aliyun.openservices:tablestore-streamclient:jar:1.0.0-SNAPSHOT 解决:

1
2
3
4
5
6
vim $ {DataX_source_code_home} /otsstreamreader/pom.xml 
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore-streamclient</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>

此处把<version>1.0.0-SNAPSHOT</version>改成<version>1.0.0</version>