Feb
18
2022
使用 Flink Hudi 构建流式数据湖平台
一、Apache Hudi 101
修改搜图
湖存储成本低、灵活性高的特性,十分适用于做查询场景的中心化存储。伴随着近年来云服务的兴起,尤其是方针存储的老练,越来越多的企业挑选在云上构建存储服务。数据湖的存算别离架构十分适合其时的云服务架构,经过快照隔离的方法,提供根底的 acid 业务,一起支撑对接多种剖析引擎适配不同的查询场景,能够说湖存储在成本和开放性上占了极大优势。修改搜图
其时的湖存储现已开端承担数仓的功用,经过和核算引擎对接完成湖仓一体的架构。湖存储是一种 table format,在原有的 data format 根底上封装了 table 的高档语义。Hudi 从 2016 年开端将数据湖投入实践,其时是为了处理大数据场景下文件体系上的数据更新问题,Hudi 类 LSM 的 table format 其时在湖格局中是独树一帜的,对近实时更新比较友好,语义也相对完善。Table format 是其时盛行的三种数据湖格局的根底属性,而 Hudi 从项目之初就一向朝着渠道方向去演化,拥有比较完善的数据治理和 table service,比方用户在写入的时分能够并发地优化文件的布局,metadata table 能够大幅优化写入时查询端的文件查找功率。下面介绍一些 Hudi 的根底概念。修改搜图
Timeline service 是 Hudi 业务层的中心笼统,Hudi 一切数据操作都是围绕着 timeline service 来打开的,每次操作经过 instant 笼统绑定一个特定的时刻戳,一连串的 instant 构成了 timeline service,每一个 instance 记录了对应的 action 和状况。经过 timeline service,Hudi 能够知道其时表操作的状况,经过一套文件体系视图的笼统结合 timeline service,能够对 table 其时的 reader 和 writer 露出特定时刻戳下的文件布局视图。修改搜图
file group 是 Hudi 在文件布局层的中心笼统,每一个 file group 相当于一个 bucket,经过文件巨细来来划分,它的每次写入行为都会产生一个新的版别,一个版别被笼统为一个 file slice,file slice 内部保护了相应版别的数据文件。当一个 file group 写入到规定的文件巨细的时分,就会切换一个新的 file group。Hudi 在 file slice 的写入行为能够笼统成两种语义, copy on write 和 merge on read。修改搜图
copy on write 每次都会写全量数据,新数据会和上一个 file slice 的数据 merge,然后再写一个新的 file slice,产生一个新的 bucket 的文件。修改搜图
而 merge on read 则比较复杂一些,它的语义是追加写入,即每次只写增量数据,所以不会写新的 file slice。它首要会测验追加之前的 file slice,只有当该写入的 file slice 被归入紧缩计划之后,才会切新的 file slice。二、Flink Hudi Integration
修改搜图
Flink Hudi 的写入 pipeline 由几个算子构成。第一个算子担任将 table 层的 rowdata 转换成 Hudi 的音讯格局 HudiRecord。接着经过一个 Bucket Assigner,它首要担任将现已转好的 HudiRecord 分配到特定的 file group 中,接着分好 file group 的 record 会流入 Writer 算子履行真实的文件写入。最后还有一个 coordinator,担任 Hudi table 层的 table service 调度以及新业务的建议和提交。此外,还有一些后台的整理人物担任整理老版别的数据。修改搜图其时的设计中,每一个 bucket assign task 都会持有一个 bucket assigner,它独立保护自己的一组 file group。在写入新数据或非更新 insert 数据的时分,bucket assign task 会扫描文件视图,优先将这一批新的数据写入到被判定为小 bucket 的 file group 里。比方上图, file group 默许巨细是 120M,那么左图的 task1 会优先写到 file group1和 file group2,留意这儿不会写到 file group3,这是因为 file group3 现已有 100M 数据,关于比较接近方针阈值的 bucket 不再写入能够避免过度写扩大。而右图中的 task2 会直接写一个新的 file group,不会去追加那些现已写的比较大的 file group 了。修改搜图
接下来介绍 Flink Hudi 写流程的状况切换机制。作业刚发动时,coordinator 会先测验去文件体系上新建这张表,假如其时表不存在,它就会去文件目录上写一些 meta 信息,也便是构建一个表。收到一切 task 的初始化 meta 信息后,coordinator 会敞开一个新的 transaction,write task 看到 transaction 的建议后,就会解锁其时数据的 flush 行为。Write Task 会先积攒一批数据,这儿有两种 flush 策略,一种是其时的数据 buffer 达到了指定的巨细,就会把内存中的数据 flush 出去;另一种是当上游的 checkpoint barrier 抵达需求做快照的时分,会把一切内存中的数据 flush 到磁盘。每次 flush 数据之后都会把 meta 信息发送给 coordinator。coordinator 收到 checkpoint 的 success 事情后,会提交对应的业务,而且建议下一个新的业务。writer task 看到新业务后,又会解锁下一轮业务的写入。这样,整个写入流程就串起来了。修改搜图
Flink Hudi Write 提供了十分丰富的写入场景。其时支撑对 log 数据类型的写入,即非更新的数据类型,一起支撑小文件兼并。别的关于 Hudi 的中心写入场景比方更新流、CDC 数据也都是 Hudi 要点支撑的。一起,Flink Hudi 还支撑历史数据的高功率批量导入,bucket insert 模式能够一次性将比方 Hive 中的离线数据或者数据库中的离线数据,经过批量查询的方法,高效导入 Hudi 格局中。别的,Flink Hudi 还提供了全量和增量的索引加载,用户能够一次性将批量数据高效导入湖格局,再经过对接流的写入程序,完成全量接增量的数据导入。修改搜图Flink Hudi read 端也支撑了十分丰富的查询视图,现在首要支撑的有全量读取、历史时刻 range 的增量读取以及流式读取。修改搜图
上图是一段经过 Flink sql 写 Hudi 的例子,Hudi 支撑的 use case 十分丰富,也尽量简化了用户需求装备的参数。经过简略装备表 path、 并发以及 operation type,用户能够十分方便地将上游的数据写入到 Hudi 格局中。三、Flink Hudi Use Case
修改搜图
第一个经典场景是 DB 导入数据湖。现在 DB 数据导入数据湖有两种方法:能够经过 CDC connector 一次性将全量和增量数据导入到 Hudi 格局中;也能够经过消费 Kafka 上的 CDC changelog,经过 Flink 的 CDC format 将数据导入到 Hudi 格局。修改搜图
第二个经典场景是流核算的 ETL (近实时的 olap 剖析)。经过对接上游流核算简略的一些 ETL,比方双流 join 或双流 join 接一个 agg,直接将改变流写入到 Hudi 格局中,然后下游的 read 端能够对接传统经典的 olap 引擎比方 presto、spark 来做端到端的近实时查询。修改搜图
第三个经典场景和第二个有些类似, Hudi 支撑原生的 changelog,也便是支撑保存 Flink 核算中行级别的改变。基于这个才能,经过流读消费改变的方法,能够完成端到端的近实时的 ETL 出产。修改搜图未来,社区两个大版别首要的精力仍是放在流读和流写方向,而且会加强流读的语义;别的在 catalog 和 metadata 方面会做自管理;咱们还会在近期推出一个 trino 原生的 connector 支撑,取代其时读 Hive 的方法,进步功率。四、Apache Hudi Roadmap
履行两条 sql 句子今后,两条 catalog 就创立成功了。
修改搜图
接下来到作业开发页面创立一个千表入湖的作业。只需求简略的 9 行 SQL,第一种语法是 create database as database,它的作用是把 MySql benchmark1 库下一切的表结构和表数据一键同步到 Hudi CDS demo 库,表的关系是一对一映射。第二条语法是 create table as table,它的作用是把 MySql benchmark2 库下一切匹配 sbtest. 正则表达式的表同步到 Hudi 的 DB1 下的 ctas_dema 表里面,是多对一的映射关系,会做分库分表的兼并。接着咱们运转并上线,然后到作业运维的页面去发动作业,能够看到装备信息现已更新了,说明现已从头上线过。接着点击发动按钮,发动作业。然后就能够到作业总览页面查看作业相关的状况信息。修改搜图
上图是作业的拓扑,十分复杂,有 1100 张源表和 101 张方针表。这儿咱们做了一些优化 —— source merge,把一切的表兼并到一个节点里,能够在增量 binlog 拉取阶段只拉取一次,减轻对 MySql 的压力。修改搜图
接下来改写 oss 页面,能够看到现已多了一个 cdas_demo 途径,进入 subtest1 途径,能够看到现已有元数据在写入,表明数据其实在写入过程中。再到作业开发页面写一个简略的 SQL 查询某张表,来验证一下数据是否真的在写入。履行上图 SQL 句子,能够看到数据现已能够查询到,这些数据与插入的数据是共同的。咱们利用 catalog 提供的元数据才能,结合 CDS 和 CTS 语法,经过几行简略的 SQL,就能轻松完成几千张表的数据入湖,极大简化了数据入湖的流程,降低了开发运维的工作量。
本公司销售:阿里云、腾讯云、百度云、天翼云、金山大米云、金山企业云盘!可签订合同,开具发票。
我有话说: