大家好,我叫汤楚熙,来自美团数据平台中心的计算平台团队,当前主要工作内容是实时数仓平台的研发。今天和大家分享一下实时数据在美团的典型应用场景,实时数仓建设中的挑战和解决方案,包括一些关键的设计细节。主要介绍以下几方面内容:
建设背景
平台架构设计
平台建设实践
未来计划
一、建设背景
美团作为本地生活领域的头部公司,在内部孵化了许多独立业务,可以看到有大家所熟悉的美团外卖、酒店、美团优选等,这些业务通过实时数据来支撑其内部各种各样的数据应用场景,比如BI、算法、骑手调度等等。
我们对业务场景做了一个简单的分类:
指标监控:比如有实时大盘,用来即时反馈业务当日运转的健康度等场景;
实时特征:比如搜索、广告CTR预估、骑手调度等,对算法特征数据新鲜度要求较高的场景;
事件处理:比如一些风控类、运营活动发券等事件驱动型场景;
数据对账:比如金融的支付业务,支付部门与业务部门各自独立,当业务部门的支付单据与支付部门不一致时,会造成资损,这时数据的实时对账就非常关键。
上图可以看到,截至目前,实时计算平台所支撑的实时数据处理场景的整体规模,说明实时数据在美团已经影响到了业务的方方面面。
实时计算平台从成立以来,经历了上图中的几个关键发展阶段。平台正式成立于2014年,我们引入Storm和Spark Streaming作为美团的第一代实时计算引擎,并且发布了第一版作业托管平台。接下来在2017年,平台正式引进了Flink,并开始初步探索以Flink SQL为主的实时数仓开发方式。并于2019年,正式将Flink SQL作为主要编程接口暴露给业务,将以任务为中心的开发模式,升级为以数据为中心的开发模式。当前,计算平台紧跟业界发展潮流,将工作内容都聚焦在数仓增量化生产、流批语义统一、统一实时离线数仓建模方式等几个方向上。
在正式开始介绍数仓平台的建设实践之前,先来回顾下平台初期所遇到的问题。实时数据开始建设之初,是没有离线数仓那样成熟的建设方法论的,而且也没有离线数仓领域那样成熟的开发工具,所以带来了以下几点问题:
首先就是高昂的开发运维成本,每次计算框架的升级,业务都需要学习一遍计算框架的API。
代码本地开发,再去线上调试,本地的case难以覆盖线上的数据问题。
业务各自的数据协议不统一,相互之间进行数据交换,沟通协作的成本也是比较高昂的。
数仓的建设方式没有统一规范,导致数据的冗余和重复建设,给后期的资源治理带来了非常大的麻烦。
从上面的问题出发,我们制定了平台的建设路线。主要集中在两个层面,首先是降低业务的开发运维门槛,让实时数仓开发可以像离线数仓开发那样简单高效。比如我们提供了标准的ETL作业模板,web集成开发环境,并且扩展了SQL的能力,使业务可以尽量以符合其认知的形式去进行代码开发。还有数仓建设中业务最关心的数据质量问题,我们也提供了相应的配套工具,帮助业务以尽可能低的成本将可靠的数据交付应用方。可用性在离线数仓建设过程中可能大多体现在数据是否按时就绪,那么实时数仓对数据的时延要求更高,所以可用性的保障也非常关键。
前面提到的都是在开发运维效率方面我们所做的一些建设规划,在大数据领域,一个底层算子性能的小小改进,都会使执行效率成倍的放大,所以我们也会花费一些精力在底层算子的优化上。
两横三纵,其中两横包括开发迭代效率,面向人的优化,重点在于对工作流。三纵包括能做(看得见、摸得着的问题)、做好和最优化。
二、平台架构设计
接下来开始着重介绍我们是如何解决上面所提到的问题的。首先从整体上来介绍下平台解决上面问题的思路。
上图是平台整体架构。从下向上来看,存储、计算、调度加上日志服务构成了我们的基础服务层。基础服务层之上是平台对业务提供的一些中间件。上层是平台抽象出的一些可自行组合的微服务集合,比如作业模板服务、UDF托管服务、元数据服务、指标采集监控、数据质量管理等,这些服务业务可以按自身的场景需要来在自己的业务内部自行组合,也可以直接使用平台包装好的大而全的集成开发平台。
上图展示了平台基础服务中最关键的计算服务的选型过程。实时数仓场景的最根本业务诉求是数据的时效性,这里的时效性通常指的是秒级的延迟,所以这里Flink和Storm胜出。其次是数据的正确性,Flink是这里唯一能够保证Exectly-Once计算语义的框架,所以Flink要优于storm。之后我们有做了benchmark测试,通过实验证明了,在绝大多数场景下Flink任务的吞吐要优于Storm,而且Flink还提供了更加成熟的SQL编程接口,所以我们最终确认选择Flink作为实时数仓的核心计算框架。
解决了计算框架的问题,接下来我们要从上层概念入手,让熟悉离线数仓开发的同学能够更快的上手实时数仓的开发。
从下向上看,我们先统一了离线和实时数仓的数据模型,无论是HiveTable、KafkaTopic,Redis的一个域,在上层暴露给业务的都是一张Table,这样业务没有过多认知上的负担了,可以在不同开发场景的概念之间轻松切换。
从上向下看,我们又统一了编程接口,使用SQL作为数仓开发的首选,这样实时和离线数仓的ETL逻辑甚至可以完全共用一套,对开发效率上也有显著的提升。
有同学可能会问,实时和离线场景的计算语义不完全相同,实时计算场景需要包含大量跟时态相关的语法,比如window,interval等,离线场景上没有,那么怎么统一呢?
的确如此,所以我们独立出一套SQL服务,短期用户也可以在SQL中加入HINT提升或者是直接提供一些参数,来告诉我们这是什么离线还是实时场景的ETL,未来我们会自动根据业务的输入、输出表的存储类型,ETL的模式,自动判断使用哪种类型的执行模式更有效。
跟社区如果对不齐怎么办:先对内解问题,如果效果真的不错,可以推回社区,如果社区有更好的方案,我们可以判断是否能够merge进来,如果不行,说明我们的架构设计本身就是有问题的。
三、平台建设实践
我们对数仓平台的定位是:集需求准备、开发测试、发布和运维监控能力的一站式实时数仓生产解决方案。
下面简单来介绍一下用户在平台上的工作流程。
在需求准备阶段,用户可以结合业务需求先来检索是否有满足需求的数据模型,如果没有找到,那么可以选择从源头开始接入,或者新建模型。模型接入或创建好之后,进入ETL开发阶段,开发过程可能会伴随着一些简单的任务调试,这些工作也全部都可以在平台上完成。在开发完成准备上线之前,用户可以创建一条发布流水线,这块内容后面还有详细的介绍,待流水线执行通过后,就可以正式发布作业了,作业上线后,平台会自动收集作业的运行时指标,用来监控作业的运行状态。
下面介绍下,平台是如何规范业务的数仓接入流程的。从上图(左)大家可以看到,跟离线数仓的入仓流程相比,在没有数仓平台前,实时数仓的入参过程突出了一个乱字,而这样会带来如下问题:
数据建设过程没有规范,后面接手的同学不知道从何入手。
接着上面的问题,如果后面同学按照自己的理解,重新接入一遍数据,长此以往,会造成大量的冗余数据,造成烟囱林立,资源浪费,后面还需要花大量的时间治理。
数仓接入这个动作本身是没有过多业务逻辑的,是可以标准化和系统化的,这样重复机械的工作内容,会造成人力资源的浪费。
面对上述问题,数仓平台提供了一套完整的实时数仓接入方案。明确的帮用户生成ODS层,这样同项目成员之间的合作,有了共同的规范和约束,不会再有因信息未对齐而造成的数据重复接入。我们不光帮助用户规范了入仓的流程,还提供了一系列数据正确性、作业稳定性的保障机制,使业务同学可以将精力集中在数仓的建设上。
在规范化业务数仓接入流程的方案设计过程中,有个小小的挑战,那就是我们的数据源并不仅仅来自MySQL binlog和nginx日志,还有大量业务自行通过SDK上报的日志,这些日志的格式难以从整体上进行抽象,而且不同业务因为服务场景不同,数据的序列化方式也难以统一,所以我们抽象出一个Adapter模块,专门用来解决这个问题。
Parser用来适配业务自定义消息格式,Formatter将用来监控作业稳定性和数据正确性的元数据信息融入到消息中,最后按照业务场景的实际诉求,允许业务根据自身场景定义序列化方式。
前面介绍了平台在数据入仓阶段如何提升开发效率。接下来介绍如何帮助业务更低门槛的进行实时数据的开发。
平台上线之初是基于Flink1.9的SQL实现的模板任务,在当时来看,他们的能力并不成熟,一些在离线场景比如SparkSQL和HiveSQL都支持的语法,在Flink上支持的并不好。所以我们决定先由平台自行根据业务需求对语法进行扩展。比如table、view、UDF的声明,还有insert into等语法的支持。当然这并不是全部。
我们不仅仅在SQL语法层面进行了改进,还对作业模板进行了增强。像watermark提取对业务时间格式有一些要求,这种业务场景强相关的逻辑并不适合直接写死在我们的模板程序代码中。所以我们在作业模板中加入了几个切面,可以由业务自行上传代码来扩充这部分能力。比如我们会在Source注册之后,提供一个切面,引入用户代码,进行日期格式转换,再执行SQL。官方虽然已经提供了计算列,我们也调研了相关能力,但是我们认为除非有一个数量级的开发效率优化效果,否则我们没必要一定follow官方的语法。
经过对模板的升级改造,可以看到平台能够支持的ETL模式已经非常丰富了,后面我们也会继续迭代,目标是可以覆盖95%以上的实时ETL场景。
UDF是一种扩展SQL表意能力的重要功能,在没有平台的时候,用户UDF都是散布在各自的代码仓库中的,这样一些较通用的UDF,不能被其他业务直接使用,业务在代码中执行一些有安全风险的行为,平台也无法有效管控。
所以我们建设了一套UDF托管服务,帮助业务集中托管UDF代码,可以编译打包时,进行提前检查、并暴露安全风险,而且通用UDF可以在业务之间共享,也能够帮助业务提升开发效率。
前面的内容,主要是如何解决开发效率的问题,下部分内容的重点是,如何保证业务的数据质量。可能在场的各位同学,有后台开发的相关经验,大家可能都了解Devops方法论的核心目标,是保证迭代效率和工程质量。实时数据开发其实与后台服务开发过程有相似的地方,作业发布后,数据就会立即生效,并作用于线上,所以我们也需要一套流程,来保证我们每次实时任务发布的数据质量不用影响到我们的数据服务质量。
我们设计了一套数仓发布的Pipeline,在每次任务迭代上线过程都会执行一次Pipeline过程,TestCase就类似于单测用例,理论上所有TestCase都通过才可以发布作业。
Pipeline服务是通过一个异步任务调度框架来实现的,每个Worker内会启动一个Flink的MiniCluster进程,执行后会将结果存入DB并在前端打印执行结果。
对于数据质量,业务还有一项非常关心的事项,也就是数据的时延。时延一方面可以说明业务交付的数据是否符合应用方的预期,另一方面也方便业务自己去排查问题,确定作业的性能瓶颈点。
Flink官方提供了一个用来计算延迟情况的机制,Latency Marker,这个东西近似于Watermark,是一类与业务数据无关的,由框架周期性产生的消息,我们要做的是根据业务的流量和业务延迟时间精度的要求,控制这类消息的发送频率和发送量,并支持跨任务传递Marker。因为平台收口了数仓接入层,所以这也使我们获取到真正的端到端延迟成为可能,我们会通过emitter向下游发送特殊的消息协议,并且下游任务的Reciver会对这类消息做特殊判断,在发送和接受数据时都会将指标上报到Raptor,即美团内部的一个业务指标监控,并最终提供给业务。
前面我们分别介绍了平台是如何提升开发效率和保证业务的数据质量的,主要解决的是数仓开发者的问题。平台还有一类用户是数仓架构师,他们不仅仅要参与数仓的建设,还需要对数仓的建设情况做整体性把控。
以前大家都是通过wiki的形式来进行数仓的规范和约束。数仓平台提出了一个项目空间的概念,每个项目空间都可以由架构师定义符合自身业务场景的一些约束项,比如架构师可以定义数仓的主题、分层规范,表、字段的命名规则,同项目空间下的实体都必须遵守负责人做定义的规范。这样可以在开发之前就保证数仓的建设质量。
下面来分享下我们在flink算子层面所做的一些优化工作。
首先实时数仓有很大一部分计算场景是用来做扩维的,也就是流表关联。流数据来自kafka,表数据通常是借助redis、hbase等分布式kv存储,当流量小的时候,每一条流数据都请求一次外存,发起一次网络io,也没有多大影响。但是像基础流量等业务,每天几百上千亿条消息,都去单独请求外存,压力可想而知。所以平台为用户准备了本地缓存机制,通过一个多级缓存的架构,来缓解超大流量下外存访问的IO的压力。
数仓ETL会包含大量聚合、关联和排序等逻辑,在有界数据处理的时候,我们对算子行为能够做出较准确的判断。但是在无界数据处理的情况下,像关联、聚合等逻辑为了保证数据的正确性,会在更新一条记录的同时产生一条回撤消息,用来修复下游已经受到影响的数据,所以实际向下游传递的消息量可能会翻倍。而当涉及到多层算子嵌套,比如聚合嵌套关联,那么消息量还会继续膨胀。
为了解决这个问题,我们研究了框架的源码,并分析了业务数据的特征,发现实际上大多数情况消息在极短的时间内会被频繁更新多次,这也就意味着我们可以将多次请求合并成一次请求,来减少状态更新的次数,从而减少向下游发送的消息量。
上图是一个Join算子的优化案例,在分析了原理后,我们认为可以分三个阶段来对算子进行优化:
首先在输入阶段,可以对输入的消息做预处理,如果发现同key数据紧跟一条回撤事件,我们这两条消息可以同时消除,而保留最新的一条消息;
接下来在计算阶段,因为双流关联需要缓存左右流各自的状态,这样我们可以将短时间同key对状态的访问,合并成一次,减少状态访问次数;
最后在事件下发阶段,可以判断消息之间的关系,重复记录直接可被直接消除。
首先是我们的Web IDE,左面是菜单栏,可以用来管理项目空间,右面是一个web编辑器,用来开发ETL脚本,编辑器下还提供了控制台,用来查看调试日志和对比调试结果,还有语法错误提示。
上图是我们的逻辑模型管理模块,在此我们可以编辑自己的模型信息,查看血缘,资源占用量,数仓相关的业务属性等元数据,来辅助业务更好地进行数仓建模。
最后展示的是运维中心,所有的作业运行状况,运行指标,操作日志都可以通过这个平台来管理。
四、未来计划
从前面的分享大家可能会看出,平台前期的大部分精力都集中在解决业务实时数仓建设流程方面的问题上。随着数仓平台在业务上的逐渐推广,以及业务的深度使用,问题更多的出现在框架的runtime层面,比如超大作业的调度成功率和时长问题,超大数据量作业的状态访问性能问题。希望通过流批一套语义、一套执行层、一套存储,来彻底解决开发运维的成本问题。
另外,随着实时数据扩展至一些ToC业务场景,这些应用有着非常高的可用性要求,所以在这个方向上我们也要继续攻关下去。
最后就是终极问题,资源和性能比的问题,也就是在确定性的条件下,用最少的资源做最多的事情。
这是我们平台当前在建设的一个重点项目——数仓增量化生产,为达成真正的流批一体做一些前置性的技术储备和路径探索。
今天的分享就到这里,谢谢大家。
如果字段的最大可能长度超过255字节,那么长度值可能…
只能说作者太用心了,优秀
感谢详解
一般干个7-8年(即30岁左右),能做到年入40w-50w;有…
230721