618临阵磨刀:Flink容器化与平台化建设少走弯路

王康 2021-06-08 10:17:50
本文根据王康老师在〖deeplus直播第268期〗线上分享演讲内容整理而成。(文末有获取本期PPT&回放的方式,不要错过)

自2017年起,为保障内部业务在平时和大促期间的平稳运行,我们唯品会就开始基于Kubernetes深入打造高性能、稳定、可靠、易用的实时计算平台,我们现在的平台支持Flink、Spark、Storm等主流框架。

 

本文将分为五个方面,分享唯品会Flink的容器化实践应用以及产品化经验:

 

图片

 

一、发展概览

 

 
1、集群规模

 

图片

 

在集群规模方面,我们有2000+的物理机,主要部署Kubernetes异地双活的集群利用Kubernetes的namespaces、labels和taints等实现业务隔离以及初步的计算负载隔离。

 

Flink任务数、Flink SQL任务数、Storm任务数、Spark任务数,这些线上实时应用加起来有1000多个,目前我们主要在支持Flink SQL这一块,SQL化是一个趋势,所以我们要支持SQL任务的上线平台。

 

 
2、平台架构

 

图片

 

我们从下往上进行解析实时计算平台的整体架构:

 

  • 资源调度层(最底层)

实际上是用deployment的模式运行Kubernetes上,平台虽然是支持yarn调度,但是yarn调度是与批任务共享资源,所以主流任务还是运行在Kubernetes上的。并且,yarn调度这一层主要是跟离线部署的一套yarn集群,在2017年的时候,我们自研了Flink on Kubernetes的一套方案,因为底层调度分了两层,所以在大促资源紧张的时候,实时跟离线就可以做一个资源的借调。

 

  •  存储层

主要用来支持公司内部基于Kafka实时数据vms,基于binlog的vdp数据和原生Kafka作为消息总线,状态存储在hdfs上,数据主要存入Redis,MySQL,HBase,Kudu,HDFS,ClickHouse等。

 

 

  • 计算引擎层

主要是Flink、Storm、Spark,目前主推的是Flink这一块,每个框架会都会支持几个版本的镜像以满足不同的业务需求。

 

 

  • 实时平台层

主要提供作业配置、调度、版本管理、容器监控、job监控、告警、日志等功能,提供多租户的资源管理(quota,label管理),提供Kafka监控。资源配置也分为大促日和平常日,大促的资源和平常的资源是不一样的,资源的权限管控也是不一样的。在Flink 1.11版本之前,平台自建元数据管理系统为Flink SQL管理schema,1.11版本开始,通过hive metastore与公司元数据管理系统融合。

 

 

  • 应用层

主要是支持实时大屏、推荐、实验平台、实时监控和实时数据清洗的一些场景。

 

二、Flink容器化实践

 

 
1、容器化方案

 

图片

 

上面是实时平台Flink容器化的架构图。Flink容器化其实是基于standalone模式部署的。

 

我们的部署模式共有client、job manager、task manager三个角色,每一个角色都会有一个deployment来控制。

 

用户通过平台上传任务jar包、配置等,存储于hdfs上。同时由平台维护的配置、依赖等也存储在hdfs上,当pod启动时,就会进行拉取等初始化操作。

 

 client中主进程是一个由go开发的agent,当client启动时,会首先检查集群状态,当集群准备好后,从hdfs上拉取jar包,再向这个集群提交任务,client的主要任务是做容错,它主要功能还有监控任务状态,做savepoint等操作。

 

通过部署在每台物理机上的smart-agent采集容器的指标写入m3,以及通过Flink暴漏的接口将metrics写入prometheus,结合grafana展示。同样通过部署在每台物理机上的vfilebeat采集挂载出来的相关日志写入es,在dragonfly可以实现日志检索。

 

1)Flink 平台化

 

在实践过程中,一定要结合具体场景和易用性,再去考虑做平台化工作。

 

图片

 

2)Flink稳定性

 

在我们应用部署以及运行过程中,异常肯定是不可避免的,这时候我们平台就需要做一些保证任务在出现异常状况后,保持稳定性的一些策略。

 

  • pod的健康和可用

  • 是由livenessProbe和readinessProbe检测的,同时指定pod的重启策略,Kubernetes本身是可以做一个pod的拉起。

 

  • Flink任务产生异常时

  •  Flink有自已本身的一套restart策略和failover机制,这是它第一层的保障。

     

  • 在client中会定时监控Flink状态,同时将最新的checkpoint地址更新到自己的缓存中,并汇报到平台,固化到MySQL中。当Flink无法再重启时,由client重新从最新的成功checkpoint提交任务。这是它的第二层保障。这一层将checkpoint固化到MySQL中后,就不再使用Flink HA机制了,少了zk的组件依赖。

     

  • 当前两层无法重启时或集群出现异常时,由平台自动从固化到MySQL中的最新checkpoint重新拉起一个集群,提交任务,这是它的第三层保障。

 

  • 机房容灾

  • 用户的jar包,checkpoint都做了异地双HDFS存储。

  • 异地双机房双集群。

 

 
2、Kafka监控方案

 

Kafka监控是我们的任务监控里非常重要的一个环节,整体的流程如下:

 

图片

 

 

平台提供监控Kafka 堆积,用户在界面上,可以配置自己的Kafka监控,告知在怎样的集群,以及用户消费message等配置信息,可以从MySQL中将用户Kafka监控配置提取后,再通过jmx监控Kafka,这样的信息采集之后,写入下游Kafka,再通过另一个Flink任务实时监控告警,同时将这些数据同步写入ck里面,从而反馈给我们的用户(这里也可以不用ck,也可以用Prometheus去做监控,都是可以的,但ck会更加适合),最后再用Grafana组件去展示给用户。

 

三、Flink SQL平台化建设

 

有了前面Flink的容器化方案之后,那么就要开始Flink SQL平台化建设了。大家都知道,这样流失的api开发起来,还是有一定的成本的, Flink肯定是比Storm快的,也相对比较稳定、容易一些,但是对于一些用户,特别是Java开发的一些同学来说,做这个是有一定门槛的。

 

Kubernetes的Flink容器化实现以后,方便了Flink api 应用的发布,但是对于Flink SQL的任务仍然不够便利。于是平台提供了更加方便的在线编辑发布、SQL管理等一栈式开发平台。

 

 
1、 Flink SQL方案

 

图片

 

平台的Flink SQL方案如上图所示,任务发布系统与元数据管理系统是完全解耦的。

 

1)Flink SQL 任务发布平台化

 

在实践过程中,需要考虑易用性,做平台化工作,主操作界面如下图所示:

 

  • Flink SQL的版本管理、语法校验、拓扑图管理等;

  • UDF 通用和任务级别的管理,支持用户自定义udf;

  • 提供参数化的配置界面,方便用户上线任务。

 

下面我们看一个用户界面配置的一个例子:

 

图片

 

下面是集群配置的一个范例:

 

图片

 

2)元数据管理

 

平台在1.11之前通过构建自己的元数据管理系统UDM,MySQL存储Kafka,Redis等schema,通过自定义catalog打通Flink与UDM,从而实现元数据管理。

 

在1.11之后,Flink集成hive逐渐完善,平台重构了Flink SQL框架,并通过部署一个SQL-gateway service服务,中间调用自己维护的sql-client jar包,从而与离线元数据打通,实现了实时离线元数据的统一,为之后的流批一体打好了基础。

 

在元数据管理系统创建的Flink表操作界面如下图所示:创建Flink表的元数据,持久化到hive里,Flink SQL启动时从hive里读取对应表的table schema信息。

 

图片

 

 
2、Flink SQL相关实践

 

平台对于官方原生支持或者不支持的connector进行整合和开发,镜像和connector,format等相关依赖进行解耦,可以快捷的进行更新与迭代。

 

1)Flink SQL相关实践

Flink SQL主要分为以下三层:

 

  •  connector层

  • 支持VDP connector读取source数据源;

  • 支持Redis string、hash等数据类型的sink&维表关联;

  • 支持kudu connector&catalog&维表关联;

  • 支持protobuf format解析实时清洗数据;

  • 支持vms connector读取source数据源;

  • 支持ClickHouse connector sink分布式表&本地表高TPS写入;

  • Hive connector支持数坊Watermark Commit Policy分区提交策略&array<string>、decimal等复杂数据类型。

 

  •  runntime层

  • 主要支持拓扑图执行计划修改;

  • 维表关联keyBy优化cache提升查询性能;

  • 维表关联延迟join。

 

  • 平台层

  • hive UDF;

  • 支持json HLL相关处理函数;

  • 支持Flink运行相关参数设置如minibatch、聚合优化参数;

  • Flink升级hadoop3。

 

2)拓扑图执行计划修改

 

针对现阶段SQL生成的stream graph并行度无法修改等问题,平台提供可修改的拓扑预览修改相关参数。平台会将解析后的FlinkSQL的excution plan json提供给用户,利用uid保证算子的唯一性,修改每个算子的并行度,chain策略等,也为用户解决反压问题提供方法。例如针对ClickHouse sink 小并发大批次的场景,我们支持修改ClickHouse sink并行度,source并行度=72,sink 并行度=24,提高ClickHouse sink tps。

 

图片

 

3)维表关联keyBy优化cache

 

针对维表关联的情况,为了降低IO请求次数,降低维表数据库读压力,从而降低延迟,提高吞吐,有以下三种措施:

 

图片

 

下面是维表关联KeyBy优化cache的图:

 

图片

    

在优化之前的时候,维表关联LookupJoin算子和正常算子chain在一起,优化之间维表关联Lookup Join算子和正常算子不chain在一起,将join key 作为hash策略的key。

 

采用这种方式优化后,例如原来的3000W 数据量维表,10个TM节点,每个节点都要缓存3000W的数据,总共需要缓存3亿的量。而经过keyBy优化之后,每个TM节点只需要缓存3000W/10 =300W的数据量,总共缓存的数据量只有3000W,这非常大程度减少了缓存数据量。

 

4)维表关联延迟join

 

维表关联中,有很多业务场景,在维表数据新增数据之前,主流数据已经发生join操作,会出现关联不上的情况。因此,为了保证数据的正确,将关联不上的数据进行缓存,进行延迟join。

 

最简单的做法是,在维表关联的function里设置重试次数和重试间隔,这个方法会增大整个流的延迟,但主流qps不高的情况下,可以解决问题。

 

增加延迟join的算子,当join维表未关联时,先缓存起来,根据设置重试次数和重试间隔从而进行延迟的join。

 

四、应用案例

 

 
1、实时数仓

 

1)实时数据入仓

 

图片

 

实时数仓主要分为三个过程:

 

流量数据一级Kafka进行实时数据清洗后,可以写到二级清洗Kafka,主要是protobuf格式,再通过Flink SQL写入hive 5min表,以便做后续的准实时ETL,加速ods层数据源的准备时间。

 

MySQL 业务库的数据,通过VDP解析形成binlog cdc消息流,再通过Flink SQL写入hive 5min表,同时会提交到自定义分区,再把分区状态汇报到服务接口,最后再做一个离线的调度。

 

业务系统通过VMS API产生业务Kafka消息流,通过Flink SQL解析之后写入hive 5min表。可以支持string、json、csv等消息格式。

 

使用 Flink SQL做流式数据入仓是非常方便的,而且 1.12 版本已经可以支持了小文件的自动合并,解决了小文件的问题,解决了大数据层一个非常普遍的痛点。

 

我们自定义分区提交策略,当前分区ready时候会调一下实时平台的分区提交api,在离线调度定时调度通过这个api检查分区是否ready。

 

采用Flink SQL统一入仓方案以后,我们可获得以下成果:

 

  • 首先我们不仅解决了以往Flume方案不稳定的问题,用户也可以实现自助入仓,大大降低入仓任务的维护成本,稳定性也可以得到保障。

 

  • 其次我们还提升了离线数仓的时效性,从小时级降低至5min粒度入仓,时效性可以增强。

 

2)实时指标计算

 

图片

 

  • 实时应用消费清洗后Kafka,通过Redis维表、api等方式关联,再通过Flink window 增量计算UV,持久化写到Hbase里。

     

  • 实时应用消费VDP消息流之后,通过Redis维表、api等方式关联,再通过Flink SQL 计算出销售额等相关指标,增量upsert到kudu里,方便根据range分区批量查询,最终通过数据服务对实时大屏提供最终服务。

 

以往指标计算通常采用Storm方式,这个方式需要通过api定制化开发,采用这样Flink方案以后,我们可以获得了一下成果:

 

  • 将计算逻辑切到Flink SQL上,降低计算任务口径变化快,解决修改上线周期慢等问题;

 

  • 切换至Flink SQL可以做到快速修改,并且实现快速上线,降低了维护的成本。

 

3)实时离线一体化ETL数据集成

 

具体的流程如下图所示:

 

图片

 

Flink SQL 在最近的版本中持续强化了维表 join 的能力,不仅可以实时关联数据库中的维表数据,现在还能关联 Hive 和 Kafka 中的维表数据,能灵活满足不同工作负载和时效性的需求。

 

基于 Flink 强大的流式 ETL 的能力,我们可以统一在实时层做数据接入和数据转换,然后将明细层的数据回流到离线数仓中。

 

我们通过将presto内部使用的HyperLogLog( 后面简称HLL) 实现引入到Spark UDAF函数里,打通HLL对象在Spark SQL与presto引擎之间的互通,如Spark SQL通过prepare函数生成的HLL对象,不仅可以在Spark SQL里merge查询而且可以在presto里进行merge查询。具体流程如下:

 

图片

 

UV近似计算示例: 

 

图片

 

所以基于实时离线一体化ETL数据集成的架构,我们可获得以下成果:

 

 
2、实验平台(Flink实时数据入OLAP)

 

唯品会实验平台是通过配置多维度分析和下钻分析,提供海量数据的A/B-test实验效果分析的一体化平台。一个实验是由一股流量(比如用户请求)和在这股流量上进行的相对对比实验的修改组成。实验平台对于海量数据查询有着低延迟、低响应、超大规模数据(百亿级)的需求。整体数据架构如下:

 

图片

 

  • 离线数据是通过waterdrop导入到ClickHouse里面去;

 

  • 实时数据通过Flink SQL将Kafka里的数据清洗解析展开等操作之后,通过Redis维表关联商品属性,通过分布式表写入到ClickHouse,然后通过数据服务adhoc查询,通过数据服务提供对外的接口。

 

业务数据流如下:

 

图片

 

业务数据流可以给大家简单介绍一下,我们的实验平台有一个很重要的ES场景,我们上线一个应用场景,我想看效果如何,上线产生的曝光、点击、加购、收藏是怎样的。我们需要把每一个数据的明细,比如说分流的一些数据,需要根据场景分区,写到ck里面去。

 

我们通过Flink SQL Redis connector,支持Redis的sink 、source维表关联等操作,可以很方便地读写Redis,实现维表关联,维表关联内可配置cache ,极大提高应用的TPS。通过Flink SQL 实现实时数据流的pipeline,最终将大宽表sink到CK 里,并按照某个字段粒度做murmurHash3_64 存储,保证相同用户的数据都存在同一shard 节点组内,从而使得ck大表之间的join 变成 local本地表之间的join,减少数据shuffle操作,提升join查询效率。

 

五、未来规划

 

 
1、提高Flink SQL易用性

 

首先我们会提高Flink的一个易用性,主要因为Flink SQL对于hive用户来说,使用起来还是有一点不一样的地方。不管是是hive,还是Spark SQL,都是批量处理的一个场景。

 

所以当前我们的Flink SQL 调试起来仍有很多不方便的地方,对于做离线hive用户来说还有一定的使用门槛,例如手动配置Kafka监控、任务的压测调优,所以如何能让用户的使用门槛降至最低,让用户只需要懂SQL或者懂业务,把Flink SQL里面的概念对用户屏蔽掉,简化用户的使用流程,是一个比较大的挑战。

 

将来我们考虑做一些智能监控,告诉用户当前任务存在的问题,不需要用户去学习太多的东西,尽可能自动化并给用户一些优化建议。

 

 
2、数据湖CDC分析方案落地

 

一方面,我们做数据湖主要是为了解决我们binlog实时更新的场景,目前我们的VDP binlog消息流,通过Flink SQL写入到hive ods层,以加速ods层数据源的准备时间,但是会产生大量重复消息去重合并。我们会考虑Flink + 数据湖的cdc入仓方案来做增量入仓。

 

另一方面我们希望通过数据湖,来替代我们Kudu,我们这边一部分重要的业务在用Kudu,虽然Kudu没有大量的使用,但鉴于Kudu的运维比一般的数据库运维复杂得多、比较小众,并且像订单打宽之后的Kafka消息流、以及聚合结果都需要非常强的实时upsert能力,所以我们就开始调研CDC+数据湖这种解决方案,用这种方案的增量upsert能力来替换kudu增量upsert场景。

 

>>>>

Q&A

 

Q1:vdp connector 是 MySQL binlog读取吗?和canal是一种工具吗?

A1 :vdp是公司binlog同步的一个组件,将binlog解析之后发送到Kafka。是基于canal二次开发的。我们定义了一个cdc format可以对接公司的vdp Kafka数据源,与Canal CDC format有点类似。目前没有开源,使我们公司用的 binlog的一个同步方案。

 

Q2 : uv数据输出到hbase,销售数据输出到kudu,输出到了不同的数据源,主要是因为什么采取的这种策略?

A2 :kudu的应用场景没有hbase这么广泛。uv实时写入的TPS比较高,hbase比较适合单条查询的场景,写入hbase 高吞吐+低延迟,小范围查询延迟低;kudu的话具备一些OLAP的特性,可以存订单类明细,列存加速,结合Spark、presto等做OLAP分析。

 

Q3 : 请问一下,你们怎么解决的ClickHouse的数据更新问题?比如数据指标更新。

A3 : ck的更新是异步merge,只能在同一shard同一节点同一分区内异步merge,是弱一致性。对于指标更新场景不太建议使用ck。如果在ck里有更新强需求的场景,可以尝试 AggregatingMergeTree解决方案,用insert 替换update,做字段级的merge。

 

Q4:binlog写入怎么保证数据的去重和一致性?

A4 : binlog目前还没有写入ck的场景,这个方案看起来不太成熟。不建议这么做,可以用采用CDC + 数据湖的解决方案。

 

Q5 : 如果ck各个节点写入不均衡,怎么去监控,怎么解决?怎么样看数据倾斜呢?

A5 :可以通过ck的system.parts本地表监控每台机器每个表每个分区的写入数据量以及size,来查看数据分区,从而定位到某个表某台机器某个分区。

 

Q6 : 你们存在实时平台是如何做任务监控或者健康检查的?又是如何在出错后自动恢复的?现在用的是yarn-application模式吗?存在一个yarn application对应多个Flink job的情况吗?

A6 : 对于Flink 1.12+版本,支持了PrometheusReporter方式暴露一些Flink metrics指标,比如算子的watermark、checkpoint相关的指标如size、耗时、失败次数等关键指标,然后采集、存储起来做任务监控告警。

 

Flink原生的restart策略和failover机制,作为第一层的保证。

 

在client中会定时监控Flink状态,同时将最新的checkpoint地址更新到自己的缓存中,并汇报到平台,固化到MySQL中。当Flink无法再重启时,由client重新从最新的成功checkpoint提交任务。作为第二层保证。这一层将checkpoint固化到MySQL中后,就不再使用Flink HA机制了,少了zk的组件依赖。

 

当前两层无法重启时或集群出现异常时,由平台自动从固化到MySQL中的最新chekcpoint重新拉起一个集群,提交任务,作为第三层保证。

 

我们支持yarn-per-job模式,主要基于Flink on Kubernetes模式部署standalone集群。

 

Q7 : 目前你们大数据平台上所有的组件都是容器化的还是混合的?

A7 :目前我们实时这一块的组件Flink、Spark 、Storm、Presto等计算框架实现了容器化,详情可看上文1.2平台架构。

 

Q8 :kudu不是在Kubernetes上跑的吧?

A8 :kudu不是在Kubernetes上运行,这个目前还没有特别成熟的方案。并且kudu 是基于cloudera manager 运维的,没有上Kubernetes的必要。

 

Q9 : Flink实时数仓维度表存到ck中,再去查询ck的话,这样方案可以吗?

A9:这是可以的,是可以值得尝试的。事实表与维度表数据都可以存,可以按照某个字段做哈希(比如user_id),从而实现local join的效果。

获取本期PPT
请添加小助手微信
图片

↓点这里可回看本期直播

阅读原文

 

最新评论
访客 2021年09月03日

有没有1000多张表

访客 2021年08月28日

metrics =》 metrix 错误

访客 2021年08月25日

只看到如何避免,如何减少书写慢 sql

访客 2021年08月25日

没看到如何治理呀

访客 2021年07月23日

果然k8s不是神!

活动预告