实时湖仓一体在腾讯的落地实践

邵赛赛 2023-06-28 11:58:30

本文根据邵赛赛老师在〖2023 中国数据智能管理峰会-上海站〗现场演讲内容整理而成。

 

图片

 

作者介绍

邵赛赛前腾讯实时湖仓团队负责人,现Co-Founder & CTO of Datastrato。Apache基金会成员,Apache Spark Inlong Livy PMC成员,曾就职于Hortonworks、Intel,10年的大数据从业经验,专注于分布式流批计算引擎的研发和优化。

 

分享概要

一、湖仓一体技术诞生的背景和现状

二、湖仓一体技术现存的问题

三、腾讯在湖仓一体上的工作

四、后续规划

 

一、湖仓一体技术诞生的背景和现状

 

 
1.湖仓的演进

 

1)数据仓库(90s)

 

需要进行数据处理的公司在湖仓演进的架构选择上都十分相似。起初,首选方式是数仓架构,比如teradata 、greenplum或Oracle等。通常数据处理的流程是把一些业务数据库,如Transactional Database等,通过ETL的方式加载到Data Warehouse中,再在前端接入一些报表或者BI的工具去展示。

 

自Bill Inmon提出数仓概念以来,从90年代的美国到国内,数仓架构一直是一个比较经典的架构,它可以高效处理结构化的数据,而且性能好、速度快。尤其是teradata,它是存算一体的架构。

 

但是随着业务类型增多,我们需要扩展更多的业务场景,如数据科学或机器科学领域等。数据类型和数量也随之增多,结构化数据在互联网领域只占很小的一部分,还有很多半结构化、非结构化的埋点日志和音视频数据等。

 

我们的数仓已经无法处理更多数据,一些新技术,尤其是开源等多个领域的大数据技术开始涌现。

 

2)数据湖——数仓两层架构(10s)

 

我们逐渐将架构划分为数仓和数据库的双层架构,把数据先加载到数据湖中,通常我们会选择Hadoop数据库作为自建数据湖。如果要做高效的查询或者报表的输出,我们会对数据再加工,放入高性能的数仓中,如ClickHouse或Doris等。

 

大概从2010年开始,随着Hadoop的盛行,绝大多数互联网公司都在用这样的架构。大家如果使用过Hadoop,相信也能感知到它可以支持各种不同的场景,基本上能够满足所有业务场景。

 

缺点:

 

  • 在效率方面存在较大缺陷,比如数据要来回导,以ETL或者反向ETL的方式导进导出,会出现多份;

  • 一致性很难保证。

 

图片

 

3)仓、湖、流——孤岛式架构(15s)

 

这个架构整体偏离线处理,随着流式框架的引入,大公司整体的数据处理架构在2015年后就变成了仓、湖、流三种架构。

 

根据不同的场景选择不同的架构,比如我要做一些Ad-hoc的场景,我们会选择在仓里面进行;如果要做一些定时的报表或业务报表,则用Spark;如果想要做一些流式数据的查询和分析,则可以用Flink之类的工具。

 

这个架构存在几个问题:

 

  • 一致性:数据分成了三路,彼此之间天然割裂,在这种割裂的情况下,一致性是一个大问题。如果大家在公司里做一些数据处理的架构如Lambda架构等,流和批数据的对齐是一个绕不开的问题,因为数据是多份的,本质上仍是一致性问题。

  • 受限的进阶分析:如果我们在湖上做数据分析,我们缺乏一些更高阶的分析能力,比如更新、快照、ACID等语义存在缺失。

  • 数据成本:每一个通路的底层存储不同,计算也不一样,因为计算需要对应的存储来决定计算的性能,所以我们需要拷贝多份数据,成本也随之上升。

 

 
2.解决之道——湖仓一体

 

大概于20年左右提出了湖仓一体的架构,试图用一个统一的湖上建仓或湖仓一体的存储架构,解决数仓和数据库的问题。

 

针对传统意义的数据湖,若在对象存储或者Hadoop上能够构建出具备数仓语义的一个格式,使得我们在湖上的格式有更强的能力去做数仓,则需要具备几个条件:

 

  • 湖上可靠的数据管理:即需要一种开放的高性能的数据组织方式。采用传统方式定义表时,缺乏一种高效的表的组织方式。我们通常用 Hive表,它就是一个目录,没有特殊的能力。我们需要一种更高效的组织能力,兼顾一些仓的特性。

 

  • 支持机器学习和数据科学:湖仓一体的技术需要有一套开放的标准或者开放的接口。大家在用数仓的时候,会发现它是存算一体的数仓,存储就是为了计算所定制。虽然性能很好,但不开放,也就是所有的生态都要建立在上面,但数据湖则是天然开放,Flink和Spark等其他引擎都能使用这些数据。

 

  • 最先进的SQL性能:若湖仓一体只是湖,那么很轻易就能办到,但是它的性能会比较差。如果要使表具备仓的性能,比如能够匹敌类似Snowflake或者Redshift这样的性能,则需要一个高性能的SQL引擎,这也是Databricks做了Photon引擎的原因,有了这些,我们就可以真正在湖上构建出一个高性能的数仓,也就是“湖仓一体”。

 

图片

 

 
3.三种主流开源技术

 

前文讲述了湖仓一体技术所要具备的几个特性,如今在开源领域主要有三种技术拥有这些特性,分别是:Hudi、Iceberg和Delta Lake。

 

它们的功能整体上比较接近,都是一种数据的组织方式,即定义了一种表的格式,这个格式主要是定义数据的组织方式,而不是确定一种数据的存储格式。与一些纯粹的数据格式或Hive表(Hive 3.0版本前)相比,它提供了ACID事务能力,这样就具备了仓的能力,它可以提供一些事务的特性和并发能力,还可以做行级数据的修改、表结构的修改和进化,这些都是传统大数据格式难以完成的事项。湖仓一体技术出现后被业界迅速采用,从21年开始就进入了Gartner技术成熟度曲线的评估。

 

 
4.湖仓一体技术的优势

 

  • 优化数据入湖流程:相比传统的成熟形态,比如T+1的入仓形态或者入湖的形态,它可以用T+0的高效的流式入湖形态,大大降低了数据的可见时延。

  • 支持更多的分析引擎:它是开放的,所以能够支持很多引擎。我们内部也对接了很多不同的引擎,包括Flink、Spark 、Presto和StarRocks等。

  • 统一数据存储和灵活的文件组织:采用比较灵活的文件组织方式,具备了一些额外的特性,使得流和批都可以用这种文件组织方式进行消费。

  • 增量读取处理能力

 

图片

 

 
5.湖仓一体落地场景

 

1)加速数据入湖

 

下图左侧是我们一个旧的数据管道。举个例子,要收集一些Spark的审计日志以观察每天的情况,那么我们就可以把Spark日志都导入到消息队列中。在腾讯内部使用的是TubeMQ,然后我们有一个服务TDSort用于归档,把数据按照小时或者天的时间格式分类,紧接着保存至HDFS上,再启动一个Hive的命令,把它添加到分区内。

 

图片

 

前面是通过流式进入,后面是批的落盘,整体设计比较复杂。为了保证exactly-once以及保证流转批的可见性,我们在原子性上花了很多心思,因为在原先的架构上我们缺乏事务的能力,所以我们通常依赖HDFS的原子性来保证可见性。

 

之后我们把整体架构迁到了以数据湖格式为体系的另一套架构中,选择用Flink来做流式的入湖,把它写到HDFS上,这样整体链路就变得更为简单。对于Flink写下的数据,我们主要选择的是Iceberg,在Flink读取把它写到Iceberg中,下游就能直接可见。

 

至此,原先T+1的可见性就变成T+0,这个是最典型、最常见的一种使用方式。这也是我们内部像广告和视频号等业务的主要使用方式,把小时级的数据可见性降低到分钟级的可见性。

 

2)构建CDC Pipeline

 

CDC在腾讯内部不算是非常大的场景,但原本通过拉链表方式去构建,会带来一些问题:一是延迟,二是后续的处理流程非常复杂。

 

我们现在改成了另一种方式,使用Flink的CDC Connector,再加上Hudi。因为针对CDC而言,Hudi在这方面的能力比Iceberg更成熟,所以选用Hudi而不是Iceberg。

 

有两种方案,一种方案是直连MySQL或PostgreSQL等类似的数据库,另一种是通过消息队列的方式,通常都是使用第一种方式,这也是比较常见的一种内部形态,与前面相比Flink CDC connector与MySQL直连获取binlog。

 

图片

 

3)近实时的流批一体架构

 

在业务侧使用整套湖仓一体技术后,从原先的Lambda架构转换成了湖仓一体的架构。在原先的架构中,流和批分离,流主要是用消息队列来做流式的Pipeline的构建,还有一条离线链路做数据的回补和对账等。但是离线存在于HDFS上,这样就会导致两条链路要做同一份数据的处理。

 

使用湖仓一体就相当于把它们合并,我们在ODS、DWD或者DWS层统一用Iceberg来进行流式写入。在流式写入后,可以在每一层中做离线或者批的分析,也可以一直做流式分析,因此同一份数据既做到了流式的读和写,又做到了批的读和写,一份数据就可以适配整个场景,不需要存多份数据或者接多条ETL Pipeline。这就是我们比较典型的一个架构,腾讯视频也是在这个架构基础上做演进。

 

图片

 

4)更好的Hive表

 

回到湖仓一体的本质,即使我们不需要上述的特性,相比传统的Hive表,它也带来了很多新的特性和能力。用于取代离线的场景化,也会有更好的效果。

 

数据治理:

 

  • 支持表结构进化:Hive的其中一个特性就是分区,在建表的时候就需要指定分区字段,同时在查询时也必须加上分区的过滤条件,否则它有可能去查所有的分区,造成大量数据的误读取。分区一旦定下来就很难变动,但Iceberg是隐式的分区,通过它的表达式来做分区的映射和转换,就可以对分区做出调整,比如原先是按月来分区,你可以把它更改成按天分区。

 

  • 支持行级数据的修正:原先Hive表的一个常见思路是用覆盖写的方式,要做数据修正时就要覆盖一个分区,但你可能只有一行数据需要调整。湖仓一体的格式提供了行级的修正能力。提供两种修正,一种是Copy On Write的修正,还有一种是Merge On Read的修正,降低了修正的代价,大大提高了它的实时性。

 

数据查询:

 

  • ACID能力:Hive依靠HDFS的原子性来保证它的可见性。比如你Insert到多个分区时,Insert涉及到跨多目录复制,则无法原子性,这时你一边 Insert一边去查询的时候就会读到脏数据,Iceberg、Hudi都是通过快照机制进行查询,快照只有被commit了以后才可见,所以这时并发地读和写数据,不会出现任何问题。

 

  • 高效的data skipping能力:像这种新的表格式,它会增加一些额外的能力,比如z-ordering的data skipping的能力,使得你能更高效地做多维数据分析。即使没有实时的需求,只想替换Hive表,那么用湖仓一体这些新的表格式也能给你带来更好的效果。

 

图片

 

二、湖仓一体技术现存的问题

 

 
1.湖仓一体内核的性能

 

随着湖仓一体实践的逐渐深入,尤其是当单链路的数据量达到分钟级,每日达到万亿规模时,湖仓一体的性能问题就要格外重视。

 

1)数据治理问题

 

  • 海量小文件:我们主要用Iceberg,它每次commit时都会生成大量文件,你要求的commit时间越短,它的小文件就会越多,几天过去,这张表的小文件数可能达到几百万,甚至上千万,这个时候再去查询,Query Plan就会跑不动,变得非常慢。

  • Query Plan时延:Iceberg保存了多副本,每一次commit都会产生一个元数据的快照,快照里面包含了很多信息,元数据的数量将越来越大。如果未做一些元数据的清理或者合并,那么只是生成执行计划就需要大量耗时。我们内部的广告系统在使用,它是一个复杂类型,大概有几千列的表结构的查询和嵌套类型的复杂字段。Iceberg未优化的时候,Query Plan甚至要十几分钟。

 

2)查询性能问题

 

  • 平衡读写性能:写和读的对于性能的要求不同,如何能够平衡写和读是非常重要的一个问题。

  • 发挥极速性能:Iceberg和Hudi很多高阶的特性,比如索引之类,我们内部也进行了大量建设。

 

3)流批一体

 

批处理希望能够有更多的数据块聚合在一起读取,做到更多样、更大的吞吐,流则需要更快的响应。

 

图片

 

 
2.湖仓一体技术的实时性限制

 

抛开内核,无论是Iceberg还是Hudi,本质上都是海量文件的组织方式,无法摆脱存储的限制,我们通常会把它存到内部的HDFS上,云上则会存到对象存储中。但对象存储也有它的限制,吞吐量较大,但延迟会较高。

 

如果需要流读,我们通常在构建实时链路的时候,会选择消息队列,它的存储模型完全不同,是低延迟高响应,顺序读写。它的存储能力决定了计算,流式计算的访问方式和离线计算的访问方式不同。

 

这个时候就会出现两个问题:

 

  • 如何平衡流式的访问和批的访问?既能做到高性能和高效,又能做到低成本?

  • 传统的Iceberg和Hudi,实现分钟级已经接近极限,如果继续加速该如何优化?

 

图片

 

三、腾讯在湖仓一体上的工作

 

 
1.内核优化

 

1)功能优化

 

  • 大宽表支持:主要针对广告,因为广告需要不断加入新的特征,随着添加的特征越来越多,表就会变得越来越宽。同时,它原来使用PB的格式,所以它有很多嵌套,现在把它转成Iceberg,就变成了一个极大的宽表,无论对于写入还是查询,都极具挑战。

  • 跨源查询支持:因为内部有旧表、新表以及不同的系统,所以需要实现跨源以及高性能的查询。

  • 流转批:我们绝大多数的链路仍是批,为使在流式写入时下游能够具有批的可见性,我们增加了Watermark机制来进行流转批。

  • 流式写入支持去重、增量读取、流量控制:我们不断改进流式写入能力,尤其是对于在Iceberg上做CDC的写入,部分列的更新等,做了很多改进。

 

2)性能优化

 

  • 元数据读取加速, 引入Alluxio:引入Alluxio,把元数据缓存在Alluxio上,加速它的访问,对并行的元数据的Query Plan、压缩格式等也做了一些调整,实现加速;

  • 复杂类型列剪支优化, 基于列信息任务切分优化;

  • V2表layout改进与合并加速;

  • 向量化,Async-IO,CBO等查询加速。

 

总体来看,设计出这些特性后,测试数据显示,我们内部的TDW与Spark相比,性能大大提升。

 

图片

 

 
2.二级索引

Snowflake或者Redshift之所以那么快,很重要的一点是因为它有索引,但我们传统的Hive表几乎没有索引。Iceberg具备了构建索引的能力,也具有ACID能力,而且它的表结构也更复杂,所以我们能够构建索引。

 

具体成果:1)引入一个索引框架;2)构建了不同类型的索引。

 

我们做的是全局索引,针对每个Data File生成对应的Index File。Index file与datafile绑定,内部有一套系统会异步更新或者生成Index。我们选择Puffin作为存储的格式,它是Iceberg定义的一种Index的存储格式。我们也改造了一定的语法,使得它能够支持索引的生成。

 

图片

 

整体完成后,我们有一个点查的场景,bloom filter就比较适合点查的场景,速度与原来相比有一个数量级的提升。

 

图片

 

 
3.流批一体的实时湖仓架构

 

我们在使用湖仓一体技术的时候,流式的性能已无法实现突破,因为受制于底层的存储,使用HDFS或者对账存储则缺乏更低的延时,所以我们也在参考社区的方案。

 

Flink社区提供了一个Flink Table Store的方案,把流存储和批存储融合为一体,现在改了名字,叫做Paimon,我们参考其做了类似的方案。在这个方案中,流和批选择了不同的存储,流选择使用消息队列,批则是底层使用数据湖的格式,封装在一起就成为了流批表。有了流批表,则能够对外提供统一的流和批的读写接口。

 

我们主要是对接Flink的场景,写的时候我们会双写到LogStore和Filestore这两个系统中,根据不同的场景读不同的系统。如果是流式则读LogStore,批则读Filestore。

 

优点:

 

  • 引擎和表的流批一体,降低业务架构复杂度:存储在形态上可以看成近似的统一体,未来也希望能实现真正的统一。

  • 屏蔽流批差异,统一SQL操作:我们把Flink和流批对接后,就可以在Flink上提供流和批的处理能力,只需要使用同一套引擎。

  • 提升时效性,兼顾流式和湖仓:因为流写到了消息队列中,所以流的性能提高,速度加快,能实现秒级的时效性。

 

图片

 

 
4.自动数据治理

 

我们引入了自动数据治理的概念,它与传统的数据治理方式的区别在于它基于事件驱动,而不是基于时间定时完成。其具备以下能力:

 

  • 做文件的聚合,包括排序聚合和zordering聚合;

  • 可以做行级或者列级的生命周期的管理;

  • 自动的索引、缓存和排序等。

 

具体的运作步骤:它会在Iceberg的存储中收集一些事件,根据事件分析当前要进行的操作,然后根据规则来生成这些操作。

 

图片

 

1)小文件合并

 

在做小文件合并时,如何生成这些规则?

 

传统意义上的小文件合并,通常来会设定一个时间点,比如每隔一小时或者每隔一天做一次,但这样会产生很多无效的作业。若你的写入很快,那么可能会有大量的堆积,若你写入很慢,那么就可能有很多无效的合并操作。

 

我们通过收集每一次commit后写入的增量,求均方差,判断当前是否达到阈值。若未到阈值,我们会逐步更新它的均方差。如果达到阈值,就会触发一个小文件的合并操作,根据事件来驱动。这样的形式会比先前的方式更能节省资源,效率也更高。

 

图片

 

2)自动重分布优化

 

现在社区也有,但我们更早开始,它主要是能够做到加速多维查询,把相关的record归类放在一起。我们会通过事件收集相关性极高常被查询的列,自动给用户推荐可以重排列的数据,并询问是否需要重排列。当用户决定重排列,数据就会进行增量,做后续的重排列,这样就能提高数据整体的有效过滤率。

 

图片

 

3)自动索引

 

我们对Iceberg引入了一个索引框架,支持bloom filter 和 bitmap的构建,但是用户并不知道如何使用索引。所以我们提供了自动索引的构建能力,会根据查询的信息分析出哪些列的用户查询频度较高,接下来我们会优先在这些列上构建索引。同时,我们选择了根据分区的增量来加theta sketch的方式来做增量的索引,而不是每次都做全表索引的重构。构建索引后,Iceberg的常用性能会出现一个大的跃升。

 

图片

 

四、后续规划

 

我们希望湖仓建设从原先的准实时湖仓向实时湖仓的架构迈进,也希望湖仓一体架构在经过元数据、缓存和索引的优化后,能够解决交互式查询和流的所有场景问题,用一套存储应对所有的场景。这是我们现在在做的事情,也是未来的目标。

 

Q&A

 

Q1:前面提及CDC的构建,是按照整库入仓还是按表的方式来进行?

 

A1:我们腾讯这边的量不算大,我们内部主要还是以append方式入湖,CDC则仍是按表的方式来,没有做太多的优化,也没有涉及整库的方式。

 

Q2:您提到小文件合并,具体的优化是指要另起一个旁路作业,还是指将这部分的功能并入到写入的流程里?

 

A2:我们采取离线和异步的方式,因为如果并入到写入的流程,会对整体写入造成拖垮或者堆积效应,所以根据我们内部的实践以及单链路1000多亿的日均写入的经验,同步写入和合并的这种方案并不可行,所以我们做的是异步方案。

 

Q3:有些场景会选择Hudi,另外一些场景选择Iceberg,请问Iceberg和Hudi的选型依据是什么?

 

A3:我们八成以上的场景都选择了Iceberg,因为我们投身及使用Iceberg社区的时间较早,所以对Iceberg的的整体把控会更好。只有涉及CDC的场景,我们才会用Hudi,因为Iceberg当前的CDC能力不够成熟,但我们也在探索和建设Iceberg的CDC能力,包括全局索引的能力、部分列的更新能力等,也是为了全链路CDC所做的优化。如果未来Iceberg具备这样的能力,我们应该会统一使用Iceberg,因为维护多套系统会增加维护的成本。其实这两个技术没有太大差别,只需选择一种即可,实际上社区的演进最终都会趋同。

 

Q4:Iceberg上有Spark和Flink等多个引擎,假如我建了一个Iceberg表,可以用Spark和Flink两种引擎同时访问底层的表吗?

 

A4:可以。因为它有所谓的事务的语义。这也取决于你的锁如何实现,默认使用比如HiveLock等可以做隔离,所以能够多引擎地去写,但会有一定的冲突概率。但针对读而言,因为Iceberg生成的每一个副本都是只读的,所以多引擎去读没有任何问题。

 

Q5:数据湖在应用侧的使用场景有哪些?

 

A5:数据湖从20年初引入到现在,在腾讯内部每年至少有10倍以上的规模增长,所以现在几乎所有的业务线都在使用。最大的业务线一般是视频号或者广告之类,也有其他的业务,基本上所有的业务都在用数据湖,无论是用于加速数据的可见性、构建CDC还是用Iceberg替代Hive表的低效查询,都会带来一定的性能提升,这些场景前文有所提及。

最新评论
访客 2024年04月08日

如果字段的最大可能长度超过255字节,那么长度值可能…

访客 2024年03月04日

只能说作者太用心了,优秀

访客 2024年02月23日

感谢详解

访客 2024年02月20日

一般干个7-8年(即30岁左右),能做到年入40w-50w;有…

访客 2023年08月20日

230721

活动预告