自2017年起,为保障内部业务在平时和大促期间的平稳运行,我们唯品会就开始基于Kubernetes深入打造高性能、稳定、可靠、易用的实时计算平台,我们现在的平台支持Flink、Spark、Storm等主流框架。
本文将分为五个方面,分享唯品会Flink的容器化实践应用以及产品化经验:
一、发展概览
在集群规模方面,我们有2000+的物理机,主要部署Kubernetes异地双活的集群利用Kubernetes的namespaces、labels和taints等实现业务隔离以及初步的计算负载隔离。
Flink任务数、Flink SQL任务数、Storm任务数、Spark任务数,这些线上实时应用加起来有1000多个,目前我们主要在支持Flink SQL这一块,SQL化是一个趋势,所以我们要支持SQL任务的上线平台。
我们从下往上进行解析实时计算平台的整体架构:
资源调度层(最底层)
实际上是用deployment的模式运行Kubernetes上,平台虽然是支持yarn调度,但是yarn调度是与批任务共享资源,所以主流任务还是运行在Kubernetes上的。并且,yarn调度这一层主要是跟离线部署的一套yarn集群,在2017年的时候,我们自研了Flink on Kubernetes的一套方案,因为底层调度分了两层,所以在大促资源紧张的时候,实时跟离线就可以做一个资源的借调。
存储层
主要用来支持公司内部基于Kafka实时数据vms,基于binlog的vdp数据和原生Kafka作为消息总线,状态存储在hdfs上,数据主要存入Redis,MySQL,HBase,Kudu,HDFS,ClickHouse等。
计算引擎层
主要是Flink、Storm、Spark,目前主推的是Flink这一块,每个框架会都会支持几个版本的镜像以满足不同的业务需求。
实时平台层
主要提供作业配置、调度、版本管理、容器监控、job监控、告警、日志等功能,提供多租户的资源管理(quota,label管理),提供Kafka监控。资源配置也分为大促日和平常日,大促的资源和平常的资源是不一样的,资源的权限管控也是不一样的。在Flink 1.11版本之前,平台自建元数据管理系统为Flink SQL管理schema,1.11版本开始,通过hive metastore与公司元数据管理系统融合。
应用层
主要是支持实时大屏、推荐、实验平台、实时监控和实时数据清洗的一些场景。
二、Flink容器化实践
上面是实时平台Flink容器化的架构图。Flink容器化其实是基于standalone模式部署的。
我们的部署模式共有client、job manager、task manager三个角色,每一个角色都会有一个deployment来控制。
用户通过平台上传任务jar包、配置等,存储于hdfs上。同时由平台维护的配置、依赖等也存储在hdfs上,当pod启动时,就会进行拉取等初始化操作。
client中主进程是一个由go开发的agent,当client启动时,会首先检查集群状态,当集群准备好后,从hdfs上拉取jar包,再向这个集群提交任务,client的主要任务是做容错,它主要功能还有监控任务状态,做savepoint等操作。
通过部署在每台物理机上的smart-agent采集容器的指标写入m3,以及通过Flink暴漏的接口将metrics写入prometheus,结合grafana展示。同样通过部署在每台物理机上的vfilebeat采集挂载出来的相关日志写入es,在dragonfly可以实现日志检索。
1)Flink 平台化
在实践过程中,一定要结合具体场景和易用性,再去考虑做平台化工作。
2)Flink稳定性
在我们应用部署以及运行过程中,异常肯定是不可避免的,这时候我们平台就需要做一些保证任务在出现异常状况后,保持稳定性的一些策略。
pod的健康和可用:
是由livenessProbe和readinessProbe检测的,同时指定pod的重启策略,Kubernetes本身是可以做一个pod的拉起。
Flink任务产生异常时:
Flink有自已本身的一套restart策略和failover机制,这是它第一层的保障。
在client中会定时监控Flink状态,同时将最新的checkpoint地址更新到自己的缓存中,并汇报到平台,固化到MySQL中。当Flink无法再重启时,由client重新从最新的成功checkpoint提交任务。这是它的第二层保障。这一层将checkpoint固化到MySQL中后,就不再使用Flink HA机制了,少了zk的组件依赖。
当前两层无法重启时或集群出现异常时,由平台自动从固化到MySQL中的最新checkpoint重新拉起一个集群,提交任务,这是它的第三层保障。
机房容灾:
用户的jar包,checkpoint都做了异地双HDFS存储。
异地双机房双集群。
Kafka监控是我们的任务监控里非常重要的一个环节,整体的流程如下:
平台提供监控Kafka 堆积,用户在界面上,可以配置自己的Kafka监控,告知在怎样的集群,以及用户消费message等配置信息,可以从MySQL中将用户Kafka监控配置提取后,再通过jmx监控Kafka,这样的信息采集之后,写入下游Kafka,再通过另一个Flink任务实时监控告警,同时将这些数据同步写入ck里面,从而反馈给我们的用户(这里也可以不用ck,也可以用Prometheus去做监控,都是可以的,但ck会更加适合),最后再用Grafana组件去展示给用户。
三、Flink SQL平台化建设
有了前面Flink的容器化方案之后,那么就要开始Flink SQL平台化建设了。大家都知道,这样流失的api开发起来,还是有一定的成本的, Flink肯定是比Storm快的,也相对比较稳定、容易一些,但是对于一些用户,特别是Java开发的一些同学来说,做这个是有一定门槛的。
Kubernetes的Flink容器化实现以后,方便了Flink api 应用的发布,但是对于Flink SQL的任务仍然不够便利。于是平台提供了更加方便的在线编辑发布、SQL管理等一栈式开发平台。
平台的Flink SQL方案如上图所示,任务发布系统与元数据管理系统是完全解耦的。
1)Flink SQL 任务发布平台化
在实践过程中,需要考虑易用性,做平台化工作,主操作界面如下图所示:
Flink SQL的版本管理、语法校验、拓扑图管理等;
UDF 通用和任务级别的管理,支持用户自定义udf;
提供参数化的配置界面,方便用户上线任务。
下面我们看一个用户界面配置的一个例子:
下面是集群配置的一个范例:
2)元数据管理
平台在1.11之前通过构建自己的元数据管理系统UDM,MySQL存储Kafka,Redis等schema,通过自定义catalog打通Flink与UDM,从而实现元数据管理。
在1.11之后,Flink集成hive逐渐完善,平台重构了Flink SQL框架,并通过部署一个SQL-gateway service服务,中间调用自己维护的sql-client jar包,从而与离线元数据打通,实现了实时离线元数据的统一,为之后的流批一体打好了基础。
在元数据管理系统创建的Flink表操作界面如下图所示:创建Flink表的元数据,持久化到hive里,Flink SQL启动时从hive里读取对应表的table schema信息。
平台对于官方原生支持或者不支持的connector进行整合和开发,镜像和connector,format等相关依赖进行解耦,可以快捷的进行更新与迭代。
1)Flink SQL相关实践
Flink SQL主要分为以下三层:
connector层
支持VDP connector读取source数据源;
支持Redis string、hash等数据类型的sink&维表关联;
支持kudu connector&catalog&维表关联;
支持protobuf format解析实时清洗数据;
支持vms connector读取source数据源;
支持ClickHouse connector sink分布式表&本地表高TPS写入;
Hive connector支持数坊Watermark Commit Policy分区提交策略&array<string>、decimal等复杂数据类型。
runntime层
主要支持拓扑图执行计划修改;
维表关联keyBy优化cache提升查询性能;
维表关联延迟join。
平台层
hive UDF;
支持json HLL相关处理函数;
支持Flink运行相关参数设置如minibatch、聚合优化参数;
Flink升级hadoop3。
2)拓扑图执行计划修改
针对现阶段SQL生成的stream graph并行度无法修改等问题,平台提供可修改的拓扑预览修改相关参数。平台会将解析后的FlinkSQL的excution plan json提供给用户,利用uid保证算子的唯一性,修改每个算子的并行度,chain策略等,也为用户解决反压问题提供方法。例如针对ClickHouse sink 小并发大批次的场景,我们支持修改ClickHouse sink并行度,source并行度=72,sink 并行度=24,提高ClickHouse sink tps。
3)维表关联keyBy优化cache
针对维表关联的情况,为了降低IO请求次数,降低维表数据库读压力,从而降低延迟,提高吞吐,有以下三种措施:
下面是维表关联KeyBy优化cache的图:
在优化之前的时候,维表关联LookupJoin算子和正常算子chain在一起,优化之间维表关联Lookup Join算子和正常算子不chain在一起,将join key 作为hash策略的key。
采用这种方式优化后,例如原来的3000W 数据量维表,10个TM节点,每个节点都要缓存3000W的数据,总共需要缓存3亿的量。而经过keyBy优化之后,每个TM节点只需要缓存3000W/10 =300W的数据量,总共缓存的数据量只有3000W,这非常大程度减少了缓存数据量。
4)维表关联延迟join
维表关联中,有很多业务场景,在维表数据新增数据之前,主流数据已经发生join操作,会出现关联不上的情况。因此,为了保证数据的正确,将关联不上的数据进行缓存,进行延迟join。
最简单的做法是,在维表关联的function里设置重试次数和重试间隔,这个方法会增大整个流的延迟,但主流qps不高的情况下,可以解决问题。
增加延迟join的算子,当join维表未关联时,先缓存起来,根据设置重试次数和重试间隔从而进行延迟的join。
四、应用案例
1)实时数据入仓
实时数仓主要分为三个过程:
流量数据一级Kafka进行实时数据清洗后,可以写到二级清洗Kafka,主要是protobuf格式,再通过Flink SQL写入hive 5min表,以便做后续的准实时ETL,加速ods层数据源的准备时间。
MySQL 业务库的数据,通过VDP解析形成binlog cdc消息流,再通过Flink SQL写入hive 5min表,同时会提交到自定义分区,再把分区状态汇报到服务接口,最后再做一个离线的调度。
业务系统通过VMS API产生业务Kafka消息流,通过Flink SQL解析之后写入hive 5min表。可以支持string、json、csv等消息格式。
使用 Flink SQL做流式数据入仓是非常方便的,而且 1.12 版本已经可以支持了小文件的自动合并,解决了小文件的问题,解决了大数据层一个非常普遍的痛点。
我们自定义分区提交策略,当前分区ready时候会调一下实时平台的分区提交api,在离线调度定时调度通过这个api检查分区是否ready。
采用Flink SQL统一入仓方案以后,我们可获得以下成果:
首先我们不仅解决了以往Flume方案不稳定的问题,用户也可以实现自助入仓,大大降低入仓任务的维护成本,稳定性也可以得到保障。
其次我们还提升了离线数仓的时效性,从小时级降低至5min粒度入仓,时效性可以增强。
2)实时指标计算
实时应用消费清洗后Kafka,通过Redis维表、api等方式关联,再通过Flink window 增量计算UV,持久化写到Hbase里。
实时应用消费VDP消息流之后,通过Redis维表、api等方式关联,再通过Flink SQL 计算出销售额等相关指标,增量upsert到kudu里,方便根据range分区批量查询,最终通过数据服务对实时大屏提供最终服务。
以往指标计算通常采用Storm方式,这个方式需要通过api定制化开发,采用这样Flink方案以后,我们可以获得了一下成果:
将计算逻辑切到Flink SQL上,降低计算任务口径变化快,解决修改上线周期慢等问题;
切换至Flink SQL可以做到快速修改,并且实现快速上线,降低了维护的成本。
3)实时离线一体化ETL数据集成
具体的流程如下图所示:
Flink SQL 在最近的版本中持续强化了维表 join 的能力,不仅可以实时关联数据库中的维表数据,现在还能关联 Hive 和 Kafka 中的维表数据,能灵活满足不同工作负载和时效性的需求。
基于 Flink 强大的流式 ETL 的能力,我们可以统一在实时层做数据接入和数据转换,然后将明细层的数据回流到离线数仓中。
我们通过将presto内部使用的HyperLogLog( 后面简称HLL) 实现引入到Spark UDAF函数里,打通HLL对象在Spark SQL与presto引擎之间的互通,如Spark SQL通过prepare函数生成的HLL对象,不仅可以在Spark SQL里merge查询而且可以在presto里进行merge查询。具体流程如下:
UV近似计算示例:
所以基于实时离线一体化ETL数据集成的架构,我们可获得以下成果:
唯品会实验平台是通过配置多维度分析和下钻分析,提供海量数据的A/B-test实验效果分析的一体化平台。一个实验是由一股流量(比如用户请求)和在这股流量上进行的相对对比实验的修改组成。实验平台对于海量数据查询有着低延迟、低响应、超大规模数据(百亿级)的需求。整体数据架构如下:
离线数据是通过waterdrop导入到ClickHouse里面去;
实时数据通过Flink SQL将Kafka里的数据清洗解析展开等操作之后,通过Redis维表关联商品属性,通过分布式表写入到ClickHouse,然后通过数据服务adhoc查询,通过数据服务提供对外的接口。
业务数据流如下:
业务数据流可以给大家简单介绍一下,我们的实验平台有一个很重要的ES场景,我们上线一个应用场景,我想看效果如何,上线产生的曝光、点击、加购、收藏是怎样的。我们需要把每一个数据的明细,比如说分流的一些数据,需要根据场景分区,写到ck里面去。
我们通过Flink SQL Redis connector,支持Redis的sink 、source维表关联等操作,可以很方便地读写Redis,实现维表关联,维表关联内可配置cache ,极大提高应用的TPS。通过Flink SQL 实现实时数据流的pipeline,最终将大宽表sink到CK 里,并按照某个字段粒度做murmurHash3_64 存储,保证相同用户的数据都存在同一shard 节点组内,从而使得ck大表之间的join 变成 local本地表之间的join,减少数据shuffle操作,提升join查询效率。
五、未来规划
首先我们会提高Flink的一个易用性,主要因为Flink SQL对于hive用户来说,使用起来还是有一点不一样的地方。不管是是hive,还是Spark SQL,都是批量处理的一个场景。
所以当前我们的Flink SQL 调试起来仍有很多不方便的地方,对于做离线hive用户来说还有一定的使用门槛,例如手动配置Kafka监控、任务的压测调优,所以如何能让用户的使用门槛降至最低,让用户只需要懂SQL或者懂业务,把Flink SQL里面的概念对用户屏蔽掉,简化用户的使用流程,是一个比较大的挑战。
将来我们考虑做一些智能监控,告诉用户当前任务存在的问题,不需要用户去学习太多的东西,尽可能自动化并给用户一些优化建议。
一方面,我们做数据湖主要是为了解决我们binlog实时更新的场景,目前我们的VDP binlog消息流,通过Flink SQL写入到hive ods层,以加速ods层数据源的准备时间,但是会产生大量重复消息去重合并。我们会考虑Flink + 数据湖的cdc入仓方案来做增量入仓。
另一方面我们希望通过数据湖,来替代我们Kudu,我们这边一部分重要的业务在用Kudu,虽然Kudu没有大量的使用,但鉴于Kudu的运维比一般的数据库运维复杂得多、比较小众,并且像订单打宽之后的Kafka消息流、以及聚合结果都需要非常强的实时upsert能力,所以我们就开始调研CDC+数据湖这种解决方案,用这种方案的增量upsert能力来替换kudu增量upsert场景。
Q&A
Q1:vdp connector 是 MySQL binlog读取吗?和canal是一种工具吗?
A1 :vdp是公司binlog同步的一个组件,将binlog解析之后发送到Kafka。是基于canal二次开发的。我们定义了一个cdc format可以对接公司的vdp Kafka数据源,与Canal CDC format有点类似。目前没有开源,使我们公司用的 binlog的一个同步方案。
Q2 : uv数据输出到hbase,销售数据输出到kudu,输出到了不同的数据源,主要是因为什么采取的这种策略?
A2 :kudu的应用场景没有hbase这么广泛。uv实时写入的TPS比较高,hbase比较适合单条查询的场景,写入hbase 高吞吐+低延迟,小范围查询延迟低;kudu的话具备一些OLAP的特性,可以存订单类明细,列存加速,结合Spark、presto等做OLAP分析。
Q3 : 请问一下,你们怎么解决的ClickHouse的数据更新问题?比如数据指标更新。
A3 : ck的更新是异步merge,只能在同一shard同一节点同一分区内异步merge,是弱一致性。对于指标更新场景不太建议使用ck。如果在ck里有更新强需求的场景,可以尝试 AggregatingMergeTree解决方案,用insert 替换update,做字段级的merge。
Q4:binlog写入怎么保证数据的去重和一致性?
A4 : binlog目前还没有写入ck的场景,这个方案看起来不太成熟。不建议这么做,可以用采用CDC + 数据湖的解决方案。
Q5 : 如果ck各个节点写入不均衡,怎么去监控,怎么解决?怎么样看数据倾斜呢?
A5 :可以通过ck的system.parts本地表监控每台机器每个表每个分区的写入数据量以及size,来查看数据分区,从而定位到某个表某台机器某个分区。
Q6 : 你们存在实时平台是如何做任务监控或者健康检查的?又是如何在出错后自动恢复的?现在用的是yarn-application模式吗?存在一个yarn application对应多个Flink job的情况吗?
A6 : 对于Flink 1.12+版本,支持了PrometheusReporter方式暴露一些Flink metrics指标,比如算子的watermark、checkpoint相关的指标如size、耗时、失败次数等关键指标,然后采集、存储起来做任务监控告警。
Flink原生的restart策略和failover机制,作为第一层的保证。
在client中会定时监控Flink状态,同时将最新的checkpoint地址更新到自己的缓存中,并汇报到平台,固化到MySQL中。当Flink无法再重启时,由client重新从最新的成功checkpoint提交任务。作为第二层保证。这一层将checkpoint固化到MySQL中后,就不再使用Flink HA机制了,少了zk的组件依赖。
当前两层无法重启时或集群出现异常时,由平台自动从固化到MySQL中的最新chekcpoint重新拉起一个集群,提交任务,作为第三层保证。
我们支持yarn-per-job模式,主要基于Flink on Kubernetes模式部署standalone集群。
Q7 : 目前你们大数据平台上所有的组件都是容器化的还是混合的?
A7 :目前我们实时这一块的组件Flink、Spark 、Storm、Presto等计算框架实现了容器化,详情可看上文1.2平台架构。
Q8 :kudu不是在Kubernetes上跑的吧?
A8 :kudu不是在Kubernetes上运行,这个目前还没有特别成熟的方案。并且kudu 是基于cloudera manager 运维的,没有上Kubernetes的必要。
Q9 : Flink实时数仓维度表存到ck中,再去查询ck的话,这样方案可以吗?
A9:这是可以的,是可以值得尝试的。事实表与维度表数据都可以存,可以按照某个字段做哈希(比如user_id),从而实现local join的效果。
↓点这里可回看本期直播
阅读原文
如果字段的最大可能长度超过255字节,那么长度值可能…
只能说作者太用心了,优秀
感谢详解
一般干个7-8年(即30岁左右),能做到年入40w-50w;有…
230721