李哲,京东搜推数据开发工程师,曾就职于美团点评,主要从事离线数据开发、流计算开发以及OLAP多维查询引擎的应用开发。
本文讨论了京东搜索在实时流量数据分析方面,利用Apache Flink和Apache Doris进行的探索和实践。流式计算在近些年的热度与日俱增,从Google Dataflow论文的发表,到Apache Flink计算引擎逐渐站到舞台中央,再到Apache Druid等实时分析型数据库的广泛应用,流式计算引擎百花齐放。但不同的业务场景,面临着不同的问题,没有哪一种引擎是万能的。我们希望京东搜索业务在流计算的应用实践,能够给到大家一些启发,也欢迎大家多多交流,给我们提出宝贵的建议。
京东集团的新使命是“技术为本,致力于更高效和可持续的世界”。京东搜索作为电商平台的一个入口,为众多商家与用户提供连接的纽带。京东搜索发挥着导流的作用,给用户提供表达需求的入口;为了正确理解用户意图,将用户的需求进行高效的转化,线上同时运行着多个AB实验算法,遍及POP形态与自营形态的多个商品,而这些商品所属的品类、所在的组织架构以及品牌店铺等属性,都需要在线进行监控,以衡量转化的效果和承接的能力。
目前搜索上层应用业务对实时数据的需求,主要包含三部分内容:
搜索整体数据的实时分析。
AB实验效果的实时监控。
热搜词的Top榜单以反映舆情的变化。
这三部分数据需求,都需要进行深度的下钻,维度细化需要到SKU粒度。同时我们也承担着搜索实时数据平台的建设任务,为下游用户输出不同层次的实时流数据。
我们的用户包括搜索的运营、产品、算法以及采销人员。虽然不同用户关心的数据粒度不同、时间频率不同、维度也不同,但是我们希望能够建立统一的实时OLAP数据仓库,并提供一套安全、可靠的、灵活的实时数据服务。
目前每日新增的曝光日志达到几亿条记录,而拆分到SKU粒度的日志则要翻10倍,再细拆到AB实验的SKU粒度时,数据量则多达上百亿记录,多维数据组合下的聚合查询要求秒级响应时间,这样的数据量也给团队带来了不小的挑战。
我们之前的方案是以Apache Storm引擎进行点对点的数据处理,这种方式在业务需求快速增长的阶段,可以快速的满足实时报表的需求。但是随着业务的不断发展、数据量逐渐增加以及需求逐渐多样化,弊端随之产生。例如灵活性差、数据一致性无法满足、开发效率较低、资源成本增加等。
为解决之前架构出现的问题,我们首先进行了架构升级,将storm引擎替换为Apache Flink,用以实现高吞吐、exactly once的处理语义。同时根据搜索数据的特点,将实时数据进行分层处理,构建出PV流明细层、SKU流明细层和AB实验流明细层,期望基于不同明细层的实时流,构建上层的实时OLAP层。
OLAP层的技术选型,需要满足以下几点:
数据延迟在分钟级,查询响应时间在秒级;
标准SQL交互引擎,降低使用成本;
支持join操作,方便维度增加属性信息;
流量数据可以近似去重,但订单行要精准去重;
高吞吐,每分钟数据量在千万级记录,每天数百亿条新增记录;
前端业务较多,查询并发度不能太低。
通过对比目前业界广泛使用的支持实时导入的OLAP引擎,我们在druid、ES、clickhouse和doris之间做了横向比较:
通过对比开源的几款实时OLAP引擎,我们发现doris和clickhouse能够满足我们的需求,但是clickhouse的并发度太低是个潜在的风险,而且clickhouse的数据导入没有事务支持,无法实现exactly once语义,对标准sql的支持也是有限的。
最终,我们选定doris作为聚合层,用于实时OLAP分析。对于流量数据,使用聚合模型建表;对于订单行,我们将聚合模型换成Uniq模型,保证同一个订单最终只会存储一条记录,从而达到订单行精准去重的目的。在flink处理时,我们也将之前的任务拆解,将反复加工的逻辑封装,每一次处理都生成新的topic流,明细层细分了不同粒度的实时流。新方案如下:
目前的技术架构中,flink的任务是非常轻的,state状态非常小,并没有使用KeyedState自定义状态,而OperatorState中只包含kafka的offset信息,这样保证任务的运行开销很小,稳定性大大提升。同时基于生产的数据明细层,我们直接使用了doris来充当聚合层的功能,将原本可以在flink中实现的窗口计算,下沉到doris中完成。利用doris的routine load消费实时数据,虽然数据在导入前是明细粒度,但是基于聚合模型,导入后自动进行异步聚合。而聚合度的高低,完全根据维度的个数与维度的基数决定。通过在base表上建立rollup,在导入时双写或多写并进行预聚合操作,这有点类似于物化视图的功能,可以将数据进行高度的汇总,以提升查询性能。
在明细层采用kafka直接对接到doris,还有一个好处就是这种方式天然的支持数据回溯。数据回溯简单说就是当遇到实时数据的乱序问题时,可以将“迟到”的数据进行重新计算,更新之前的结果。这是因为我们导入的是明细数据,延迟的数据无论何时到达都可以被写入到表中,而查询接口只需要再次进行查询即可获得最新的计算结果。最终方案的数据流图如下:
实时数据处理的技术实现,需要在低延迟、准确性和成本之间进行取舍。我们采用doris作为实时仓库的聚合层,其实也是在多方面进行了取舍。例如:
routine load的导入任务,为了达到更高的写入吞 吐量,我们将实时导入任务的最大时间间隔设置了30s,即增加了导入延迟,换来了更大的吞吐;
为了降低开发成本,节省计算资源,我们通过建立rollup来支持快速的查询需求,但是却增加了存储压力以及写入时的IO压力;
PV、UV等流量指标在聚合时采用的是HLL计算,降低了精度,换来了更短的查询响应时间。
以上几点取舍,是结合业务场景与需求的要求而决定的,并非绝对的情况。所以,面对实时数据大规模、无界、乱序等特点,实时流计算的选型,最终考虑的就是如何取舍。
上文提到我们在doris中建立了不同粒度的聚合模型,包括PV粒度、SKU粒度以及AB实验粒度。我们这里以每日生产数据量最大的曝光AB实验模型为例,阐述在doris中如何支持大促期间每日新增百亿条记录的查询的。
AB实验的效果监控,业务上需要10分钟、30分钟、60分钟以及全天累计等四个时间段,同时需要根据渠道、平台和一二三级品类等维度进行下钻分析,观测的指标则包含曝光PV、UV、曝光SKU件次、点击PV、点击UV等基础指标,以及CTR等衍生指标。
在数据建模阶段,我们将曝光实时数据建立聚合模型,其中K空间包含日期字段、分钟粒度的时间字段、渠道、平台、一二三级品类等,V空间则包含上述的指标列,其中UV和PV进行HLL近似计算,而SKU件次则采用SUM函数,每到来一条记录则加1。由于AB实验数据都是以AB实验位作为过滤条件,因此将实验位字段设置为分桶字段,查询时能够快速定位tablet分片。值得注意的是,HLL的近似度在目前PV和UV的基数下,实际情况误差在0.8%左右,符合预期。
目前doris的集群共30+台BE,存储采用的是支持NVMe协议的SSD硬盘。AB实验曝光topic的分区数为40+,每日新增百亿条数据。在数据导入阶段,我们主要针对导入任务的三个参数进行优化:最大时间间隔、最大数据量以及最大记录数。当这3个指标中任何一个达到设置的阈值时,任务都会触发导入操作。为了更好的了解任务每次触发的条件,达到10分钟消费6亿条记录的压测目标,我们通过间隔采样的方法,每隔3分钟采样一次任务的情况,获取Statistic信息中的receivedBytes、cimmittedTaskNum、loadedRows以及taskExecuteTimeMs数值。通过对上述数值在前后2个时间段的差值计算,确定每个任务触发的条件,并调整参数,以在吞吐和延迟之间进行平衡,最终达到压测的要求。
为了实现快速的多维数据查询,基于base表建立了不同的rollup,同时每个rollup的字段顺序,也要遵循过滤的字段尽可能放到前面的原则,充分利用前缀索引的特性。这里并不是rollup越多越好,因为每个rollup都会有相应的物理存储,每增加一个rollup,在写入时就会增加一份IO。最终我们在此表上建立了2个rollup,在要求的响应时间内尽可能多的满足查询需求。
京东搜索是在今年5月份引入doris的,第一个应用的上线到现在已经运行半年时间。目前集群版本是0.11.33,规模是30+台BE,线上同时运行着10+个routine load任务,每日新增数据条数在200亿+,已经成为京东体量最大的doris用户。从结果看,用doris替换flink的窗口计算,既可以提高开发效率,适应维度的变化,同时也可以降低计算资源,用doris充当实时数据仓库的聚合层,并提供统一的接口服务,保证了数据的一致性和安全性。
我们在使用中也遇到了查询相关的、任务调度相关的bug,也在推动京东OLAP平台升级到0.12版本。接下来待版本升级后,我们计划使用bitmap功能来支持UV等指标的精准去重操作,并将推荐实时业务应用doris实现。除此之外,为了完善实时数仓的分层结构,为更多业务提供数据输入,我们也计划使用适当的flink窗口开发聚合层的实时流,增加数据的丰富度和完整度。
如果字段的最大可能长度超过255字节,那么长度值可能…
只能说作者太用心了,优秀
感谢详解
一般干个7-8年(即30岁左右),能做到年入40w-50w;有…
230721