DTS 数据同步集成 MaxCompute 数仓最佳实践

作者阿里云代理 文章分类 分类:linux图文教程 阅读次数 已被围观 632

DTS 数据同步集成 MaxCompute 数仓最佳实践


内容简介:

1.演示根底环境布置

2.MaxCompute 数仓建立


场景描述

本文 Step by Step介绍了经过数据传输服务 DTS 完结从云数据库 RDS 到MaxCompute的数据同步集成,并介绍如何运用 DTS 和 MaxCompute 数仓联合完结数据 ETL 幂等和数据生命周期快速回溯。


解决问题-

1.完结大数据实时同步集成。

2.完结数据 ETL 幂等。

3.完结数据生命周期快速回溯。


产品列表

·MaxCompute

·数据传输服务 DTS

·DataWorks

·云数据库 RDS MySQL 版


最佳实践概述

客户 T+1 数仓传统 ETL 存在以下痛点:

1)数据抽取不幂等或容错率低,如清晨0:00发动的 ETL 使命由于各种原因(数据库HA 切换、网络颤动或 MAXC 写入失败等)失败后,再次抽取无法获取0:00时的数据状况。

2)针对不标准规划表,如没有 create  time/update time 的前史留传表,传统ETL需全量抽取。

3)实时性差,抽取数据+重试使命往往需要 1-3小 时。

别的数据库的数据生命周期回溯难,如客户想回溯一年内某些数据的增修正生命周期,需要从 OSS 拉取一年的 binlog 解析和剖析,十分困难,费时吃力。

本文 Step by Step 介绍了经过数据传输服务 DTS 完结从云数据库 RDS 到MaxCompute 的数据同步集成,并介绍如何运用 DTS 和 MaxCompute 数仓联合完结

·DTS 简便易用,能够经过控制台一键完结同步,也能够经过 OpenAPI 批量生成。

·DTS 将 binlog 作为大数据同步的手段,能够完结 ETL 幂等,大大提高数据仓库的数据质量。

·针对不标准规划的表,依然能够经过 binlog 的时刻来生成创立和修正时刻。

·实时性提高,将清晨的批量抽取改为准实时同步,清晨 0:00 过后几分钟就能够开端批处理使命。

·经过 MaxCompute 的分布式核算才能,能够快速回溯数据的增修正生命周期。

 

1.演示根底环境布置

本章节在阿里云上建立演示根底环境云数据库 RDS MySQL 版并结构相关数据。本最佳实践引荐客户运用云数据库 RDS MySQL  版,但 对 ECS 上的自建数据库、以及经过专线/VP N 网关/智能网关接入的自建数据库相同适用。

1.1.环境布置

1.1.1.创立专有网络 VPC

1.1.2.布置云数据库 RDS

1.2.模仿数据结构

本节模仿结构电商数据库 shopping-db。shopping-db 一共包括三个表,分别为用户表 user、库存表 inventory 和订单表 orders。

过程1  在数据管理 DMS 控制台,挑选导航 SQL操>SQL 窗口。

过程2  复制并张贴以下 SQL 句子创立用户表 user、库存表 inventory 和订单表orders,单击履行(F8)。

(示例 demo 代码能够经过履行以下指令获取:

git clone git@code.aliyun.com:best-practice/146.git)

--用户/账户表

create table user(

user id bigint not null auto increment comment ‘user id用户ID’,

user name varchar(3o) comment ‘用户名’,

phone num varchar(20) comment ‘手机号’,

email varchar(100) comment ‘email’,

acct decimal(18,2) comment '账户余额,

primary key (user_id),

arcs

key l1 (user_name)

);

--库存表

create table inventory(

inventory_id bigint not nult auto_increment comment 'inventory_id库存产品ID',

inventory_name varchar(30) comment ‘产品名’,

price_unit decimal(18,2) comment ‘产品单价’,

inventory_num bigint not null default 0 comment ’剩余库存数’,

primary key(inventory_id)

);

--订单表

create table orders(

order_id bigint not null auto_increment comment 'order_id订单ID',

user_id. bigint not null comment 'user_id用户ID',

orice_unit decimal(18,2) comment '产品单价’,

order_num bigint not null default 0 comment ‘订单购买数’,

create_time datetime not null default current timestamp,

update_time datetime not nuit default current_timestamp on update current_timestamp,

primary key(ordex_id),

key l1 (user_id),

key l2(inventory_id),

);

 

2.MaxCompute 数仓建立

本章经过 DTS 完结从云数据库 RDS 到 MaxCompute,的数据同步集成,并介绍如何运用 DTS 和 MaxCompute 数仓联合完结数据ETL幂等和数据生命周期快速回溯。

2.1.注册 DataWorks

DataWorks:是一个供给了大数据 OS 才能、并以 all in one box 的方式供给专业高效、安全可靠的一站式大数据智能云研发平台。一起能满意用户对数据管理、质量管理需求,赋予用户对外供给数据服务的才能。

数据传输服务 DTS:数据传输服务 (Data Transmission Service)DTS 支撑关系型数据库、NoSQL、大数据(OLAP)等数据源间的数据传输。它是一种集数据迁移、数据订阅及数据实时同步于一体的数据传输服务。数据传输致力于在公共云、混合云场景下,解决远距离、毫秒级异步数据传输难题。

MaxCompute:MaxCompute (原 ODPS)是一项大数据核算服务,它能供给快速、彻底保管的 PB 级数据仓库解决方案,使您能够经济并高效的剖析处理海量数据。-

2.3.创立 MaxCompute 项目

过程1  登录 DataWorks 控制台。(https://workbench.data.aliyun.com/console)

过程2  在左边导航栏挑选作业空间列表,切换地域为华东1(杭州),在作业空间列表页面,单击创立作业空间。

过程3  在根本装备页,输入作业空间称号 bp_data_warehouse,其他装备坚持默许,单击下一步。注:作业空间称号每 个 Region 内仅有,请自界说称号。 

过程4  在挑选引擎页里,在挑选核算引擎服务区域,勾选 MaxCompute 按量付费,单机下一步。 

过程5  在引擎详情页,输入实例显现称号和 MaxCompute 项目称号,单击创立作业空间。

l 实例显现称号:bp_data_warehouse

l MaxCompute 项目称号:bp_data_warehousee

l MaxCompute 访问身份:阿里云主账号

l Quota 组切换:按量付费默许资源

过程6  在作业间列表页面,能够查看到状况为正常的作业空间

2.4.DTS 数据同步集成 MaxCompute

依据需要同步的数据在写入后是否发生改变,分为稳定的存量数据(通常是日志数据)和持续更新的数据(例如本示例库存表中,产品单价会发生改变)。

对稳定的存量数据进行增量同步,由于数据生成后不会发生改变,因而能够很方便地依据数据的生成规则进行分区。较常见的是依据日期进行分区,例如每天1个分区。

依据数据仓库反映前史改变的特色,需要对持续更新的数据进行增量同步。

传统 ETL 东西供给每日增量、每日全量的上传方式,但由于数据库数据的增量上传是经过数据库供给数据改变的日期字段来完结,要求您的数据库有数据改变的日期字段。

数据传输服务 DTS 数据同步功用经过 binlog 完结。对无数据改变日期的表,依然能够经过 binlog 的时刻来生成创立/修正时刻,兼容性更好,一起运用DTS提高了数据集成的实时性,完结准实时同步。

2.4.1.1.装备 DTS 数据同步

留意事项:

1)DTS 在履行全量数据初始化时将占用源库和方针库一定的读写资源,可能会导致数据库的负载上升,在数据库功能较差、标准较低或业务量较大的情况下(例如源库有大量慢 SQL、存在无主键表或方针库存在死锁等),可能会加剧数据库压力,甚至导致数据库服务不可用。因而您需要在履行数据同步前评估源库和方针库的功能,一起主张您在业务低峰期履行数据同步(例如源库和方针库的 CPU 负载在30%以下)。

2)仅支撑表级别的数据同步。

3)在数据同步时,请勿对源库的同步目标运用 gh-ost 或 pt-online-schema-change 等相似东西履行在线 DDL 变 更,否则会导致同步失败。

4)由于 Maxcompute 不支撑主键束缚,当 DTS 在同步数据时因网络等原因触发重传,可能会导 致 MaxCompute 中呈现重复记录。

过程1  登录数据传输 DTS 控制台。(https://dts.console.aliyun.com/#/home/)

过程2  在左边导航栏挑选数据同步,在数据同步页面,挑选华东1(杭州),并单击创立同步作业。

过程3  挑选数据传输服务 DTS(后付费),完结以下装备,其他装备坚持默许,并单机立即购买。

过程4  在承认订单页面,承认各项参数信息。承认无误,阅读、同意并勾选《数据传输服务(后付费)服务协议》前的复选框,并单机去注册。

过程5  注册使命提交成功后,单机管理控制台回来。

过程6  定位至已购买的数据同步实例,单机装备网络链路。

过程7  装备同步通道的源实例及方针实例信息

装备完结,单机页面右下角的授权白名单并进行下一步。

过程8  单机页面右下角的下一步,答应将 MaxCompute 中的项目的下述权限授予给DTS同步账号,详情如下图所示。

过程9  装备同步策略和同步目标。

上述装备完结后,单机页面右下角的预查看并发动

过程10  等候同步作业的链路初始化完结,直至处于同步中状况。

能够在数据同步页面,查看数据同步作业的状况。

2.4.1.2.验证 DTS 数据实时同步

同步过程介绍:

1)结构初始化。

DTS 将源库中待同步表的结构界说信息同步至 MaxCompute 中,初始化时 DTS 会为表名添加_ base 后缀。例如源表为 user,那么 MaxCompute 中的表即为user_base.

2)全量数据初始化。

DTS将源库中待同步表的存量数据,悉数同步至 MaxCompute 中的方针表名 _base表中(例如从源库的 user 表同步至 MaxCompute 的 user_base,表),作为后续增量同步数据的基线数据。

3)增量数据同步。

DTS 在 MaxCompute 中创立一个增量日志表,表名为同步的方针表名 _log,例如user_log,然后将源库产生的增量数据实时同步到该表中。

增量日志表结构界说说明请参阅:

https://help.aliyun.com/document_detail/44547.html?#title-g2g-87l-hfi

过程1  请参阅1.1.2.2.创立数据库和账号的过程8登录RDS数据库。

过程2  在数据管理 DMS 控制台,挑选导航 SQ 操作>SQL窗口,复制并张贴如下SQL 句子写入测验数据,单击履行(F8)。

--写入测验用户

insert into user(user_name,phone_num,email,acct)

values(‘test_acct’,‘18866668888',‘test@test.com',300.99);

--写入测验产品及库存

insert into inventory(inventory_name,price_unit,inventory_num)

values(‘lPhone X’,4999,300);

过程3  登录 DataWorks 控制台 (https://workbench.data.aliyun.com/console)

过程4  在左边导航栏挑选作业空间列表,在作业空间列表页面,定位至已创立的作业空间,单击进入数据开发。

过程5  在左边导航栏挑选公共表,单击 user_log,单击数据预览。

能够查看到刚在数据库 shopping-db 的 user 表写入的用户 test_acct,现已实时同步到了 bp_data_warehouse 的 user_log 表。

假如没有呈现数据,请耐心等候 2~3 分钟后再查看。

举例调整 iphone 12  的价格

查看后发现数据已生成

2.4.2.新增表数据同步处理

DTS 支撑在数据同步的过程中新增同步目标。

过程1  登录 DTS 控制台。(https:/ldts.console.aliyun.com/#/home/)

过程2  左边导航栏,单击数据同步。

过程3  在同步作业列表页面顶部,挑选数据同步实例所属地域。

过程4  找到方针同步作业,单击其操作列的更多>修正同步目标。

过程5  在源库目标区域框中,单机需要新增的同步目标,并单机>向右小箭头移动至已挑选目标区域框中,

过程6  单机预查看并发动。

2.5.ETL幂等完结

依据等幂性准则,1个使命屡次运转的结果是一致的。

假如数据抽取不幂等,例如清晨0:00发动的 ETL 使命,由于各种原因(数据库HA切换、网络颤动或 MaxCompute 写入失败等)失败后,再次抽取将无法获取0:00时的数据状况。

DTS 运用 binlog 作为大数据同步的手段,将全量数据同步到 base 表,增量数据依靠binlog 实时同步到 log 表,统一做合表清洗能够拿到任意时刻点的快照数据,然后完结 ETL 幂等,大大提高数据仓库的数据质量。

过程1  请参阅1.1.2.2.创立数据库和账号的过程8登录 RDS 数据库。

过程2  在数据管理 DMS 控制台,挑选导航 SQL 操作>SQL 窗口,复制并张贴如下SQL 句子写入测验数据,单击履行 (F8)。

留意:在相隔一分钟,两次调整了产品名为 IPhone X 的产品单价。

--屡次调整价格 RDS

update inventory set price_unit=3999 where inventory_name=‘IPhone X’,

select sleep(60);

update inventory set price_unit=4999 where inventory name=‘IPhone X’,

过程3  登录 DataWorks 控制台。(https://workbench.data.aliyun.com/console)

过程4  在左边导航栏挑选作业空间列表,在作业空间列表页面,定位至已创立的作业空间,单击进入数据开发。

过程5  在左边导航栏挑选暂时查询,单击新建节点 >QDPS SQL。

过程6  输入节点称号,单次提交。

过程7  复制张贴实 例 SQL 句子,运转。

经过结果能够验证,经过 MaxCompute 的 SQL 命 令,对全量基线表 _base 和增量日志表_log 履行合并操作,得到某个时刻点的 snapshot 数据,确保抽取的幂等性。

举例如下,修正Iphone 12 价格。

点击履行,在暂时查询中修正时刻运转

将时刻改为17:19

改为17:20,17:21等又呈现不同的价格。

2.6.数据回溯完结

本节演示快速回溯数据的增修正生命周期。

过程1  登录 DataWorks,控制台。(https://workbench.data.alivyun.com/console)

过程2  在左边导航栏挑选作业空间列表,在作业空间列表页面,定位至已创立的作业空间,单击进入数据开发。

过程3  在左边导航栏挑选暂时查询,单击新建节点 >QDPS SQL。

过程4  输入节点称号,单次提交。

过程5  复制张贴实例 SQL 句子,运转。

经过结果能够验证,即使原表没有创立和修正时刻,也能够经过 binlog 生成时刻new_dts_sync_utc_timestamp 完结产品名为 IPhone X 的库存数据回溯。



本公司销售:阿里云、腾讯云、百度云、天翼云、金山大米云、金山企业云盘!可签订合同,开具发票。

我有话说: