本文介绍eBay广告数据平台的基本情况,并对比分析了ClickHouse与Druid的使用特点。基于ClickHouse表现出的良好性能和扩展能力,本文介绍了如何将eBay广告系统从Druid迁移至ClickHouse,希望能为同业人员带来一定的启发。
eBay广告数据平台为eBay第一方广告主(使用Promoted Listing服务的卖家)提供了广告流量、用户行为和效果数据分析功能。广告卖家通过卖家中心(Seller Hub)的营销标签页、效果标签页和公开API,有效掌控和对比店铺的营销活动和推广商品的流量、销量的实时和历史数据,并通过网页或者API 下载数据分析报告。
这一系统上线之初使用了自研的分布式SQL引擎,构建在对象存储系统之上。3年前随着广告流量增加,我们把数据引擎切换到Druid上。
这一平台的主要挑战如下:
数据量大:每日的插入数据记录有数百亿条,每秒的插入峰值接近一百万条;
离线数据摄入:在不影响实时数据摄入的情况下,每天需要对前1-2天的数据进行在线替换。根据上游数据团队发布清洗过的每日数据,广告数据平台需要在不影响查询的情况下每日替换实时数据,数据切换要求实现跨节点的全局原子操作;
完整性和一致性:面向卖家的财务数据,离线更新后的数据要求不能有遗漏和重复;实时数据要求端对端的延迟在十秒内。
Druid于2011年由Metamarkets开发,是一款高性能列式在线分析和存储引擎。它于2012年开源,2015年成为Apache基金会旗下项目。Druid在业界使用广泛,为千亿级数据提供亚秒级的查询延迟,擅长高可用、水平扩展;另外为数据摄入提供了很多非常方便的聚合、转换模版,内建支持多种数据源,最快可以在几十分钟内配置好新的数据表,包括数据定义和数据摄入链路(Lambda架构),大大提高了开发效率。
ClickHouse由俄罗斯最大的搜索引擎公司Yandex研发,设计目标是支持Yandex.Metrica(世界第二大Web分析平台)生成用户分析报表等核心功能。ClickHouse是一个数据库管理系统(DBMS),有数据库、表、视图、DDL、DML等概念,并提供了较为完整的SQL支持。其核心特性有如下几点:
高效的数据存储:通过数据压缩和列式存储,可以达到最高10倍的数据压缩率;
高效的数据查询:通过主键索引、向量化引擎处理、多处理器并发和分布式查询,最大压榨CPU的所有能力,在中小规模的数据量上尤为突出;
灵活的数据定义和接入:通过支持SQL语言、JDBC和关系模型,降低学习和迁移成本,可以和其他现有数据的产品无缝集成。
Druid虽然提供了很多非常方便的数据摄入功能,但它的组件构成也较为复杂,节点类型有6种(Overload, Coordinator, Middle Manager, Indexer, Broker和Historical)。除了自身的节点,Druid还依赖于MySQL存储元数据信息、Zookeeper选举Coordinator和Overlord、HDFS备份历史数据。
ClickHouse的架构采用了对等节点的设计,节点只有一种类型,没有主从节点。如果使用了副本功能,则依赖于Zookeeper保存数据段的同步进度。
与此同时,eBay的基础架构团队提出在定制ClickHouse的基础上,向产品团队提供列式数据库存储的服务。除了运维和生命周期管理,基础架构团队对ClickHouse进行改造和二次开发,进一步提高了数据摄入和存储的效率,并在离线摄入方面弥补了和Druid的功能差距。
Druid通过引入实时数据的索引任务,把实时数据处理成一个个分段数据(segment),并归档成历史数据。成为分段数据之后,该时段数据即不可写入。由于并发实时索引任务数的限制,我们设置了3个小时的窗口长度(每个小时一个任务),因此超过3个小时的数据就无法写入。在某些极端情况下,例如上游数据延迟或者实时数据消费过于滞后,就会导致离线数据替换前这部分数据的缺失。ClickHouse则没有这个限制,任意分区都可以随时写入。
ClickHouse支持的主键并不是传统意义下关系型数据库的主键。传统的主键要求每条表记录都有唯一的键值,通过查询主键可以唯一地查询到一条表记录。而在ClickHouse中,主键定义了记录在存储中排序的顺序,允许重复,所以称之为排序键似乎更加合理。事实上在ClickHouse里的主键定义通过ORDER BY声明,仅在个别场景中允许和排序键不一致(但必须是排序键的前缀)。
由于我们的产品是给卖家提供分析功能,几乎所有的查询限定在了单一卖家维度,因此通过主键按照卖家排序,可以极大地提高查询效率以及数据压缩率。
如上图所示,系统由4个部分组成:
实时数据获取模块,接入eBay的行为和交易实时消息平台;
离线数据替换模块,接入eBay内部的数据仓库平台;
ClickHouse部署和外围数据服务;
报表服务,支撑广告主、商家后台和eBay公开API。
ClickHouse提供了丰富的schema配置。这方面需要根据业务场景和数据模式反复斟酌和多次试验,因为不同的选择会对存储和性能有数量级的影响,一个错误的选择会导致后期巨大的调优和变更成本。
1)表引擎
ClickHouse的存储引擎的核心是合并树(MergeTree),以此为基础衍生出汇总合并树(SummingMergeTree),聚合合并树(AggregationMergeTree),版本折叠树(VersionCollapsingTree)等常用的表引擎。另外上述所有的合并树引擎都有复制功能(ReplicatedXXXMergeTree)的对应版本。
我们的广告数据平台的展示和点击数据选择了复制汇总合并树。这两类用户行为数据量极大,减小数据量节省存储开销并提升查询效率是模式设计的主要目标。ClickHouse在后台按照给定的维度汇总数据,降低了60%的数据量。销售数据选择了普通的复制合并树,一方面由于销售数据对某些指标有除汇总以外的聚合需求,另一方面由于本身数据量不大,合并数据的需求并不迫切。
2)主键
一般情况下,ClickHouse表的主键(Primary Key)和排序键(Order By Key)相同,但是采用了汇总合并树引擎(SummingMergeTree)的表可以单独指定主键。把一些不需要排序或者索引功能的维度字段从主键里排除出去,可以减小主键的大小(主键运行时需要全部加载到内存中),提高查询效率。
3)压缩
ClickHouse支持列级别的数据压缩,显著地减少原始数据的存储量,这也是列存储引擎的巨大优势。查询阶段,较小的存储占用也可以减少IO量。对不同列选择一种合适的压缩算法和等级,能把压缩和查询的平衡做到性价比最优。
ClickHouse的所有列默认使用LZ4压缩。除此以外,一般的数据列可以选择更高压缩率的算法如LZ4HC,ZSTD;而对于类似时间序列的单调增长数据可以选择DoubleDelta, Gorilla等特殊压缩算法。LZ4HC和ZSTD等高压缩率的算法还可以自己选择压缩级别。在我们的生产数据集上,ZSTD算法对String类型字段压缩效果较为显著。LZ4HC是LZ4的高压缩比改进版,更适用于非字符串类型。
更高的压缩率意味着更少的存储空间,同时由于降低了查询的IO量,可以间接提升查询性能。不过CPU也不是大风刮来的,数据的插入性能就成了牺牲品。根据我们内部测试的数据,在我们的生产数据集上使用LZ4HC(6)相比LZ4可以节省30%的数据,但实时数据摄取性能下降了60%。
4)低基
值得一提的是,对于基数较低的列(即列值多样性低),可以使用LowCardinality来降低原始存储空间(从而降低最终存储空间)。如果在使用压缩算法的情况下对一字符串类型的列使用LowCardinality,还能再缩小25%的空间量。
在我们的测试数据集上,如果整表组合使用LowCardinality、LZ4HC(6)和ZSTD(15),整体压缩比大约在原来的13%左右。
1)挑战
针对广告主的数据报表要求数据准确、一致。实时的行为数据存在少量的bot数据(需要离线清除),另外广告的归因也需要在离线阶段重新调整,因此我们引入了离线数据链路,在实时数据写入24-72小时之后,用离线数据替换实时数据。其中的挑战如下:
广告系统每天需要处理的用户离线数据量近1TB,在此之前,需要耗费大量时间将数据从Hadoop导入Druid。另外,导入期间的I/O、CPU和内存的开销对查询的压力不小。如何在保证数据一致性的同时,亦确保数据迁移的效率,是问题的关键;
如何在数据替换期间,确保用户可见的数据波动最小。这就要求数据替换操作是原子性的,或者至少对每个广告主都是原子的;
除了日常的离线数据更新,在数据仓库数据出现偏差遗漏时,需要支持大范围的数据修正和补偿。作业调度要求保证日常工作及时完成,并尽快完成数据修正工作。此外还需要监控数据更新中的各种指标,以应对各种突发状况。
Druid原生支持数据离线更新服务,我们与基础架构团队合作,在ClickHouse平台实现了这一功能。
2)数据架构
对于整合在线数据和离线数据的大数据架构,业界通常的做法是Lambda架构。即离线层和在线层分别导入数据,在展示层进行数据的合并。
我们也大致上采用了这一架构。但具体的做法和经典有所不同。ClickHouse里数据分区(partition)是一个独立的数据存储单元,每一个分区都可以单独从现有表里脱离(detach)、引入(attach)和替换(replace)。分区的条件可以自定义,一般按照时间划分。通过对数据表内数据分区的单个替换,我们可以做到查询层对底层数据更新的透明,也不需要额外的逻辑进行数据合并。
3)Spark聚合与分片
为了降低ClickHouse导入离线数据性能压力,我们引入了Spark任务对原始离线数据进行聚合和分片。每个分片可以分别拉取并导入数据文件,节省了数据路由、聚合的开销。
4)数据更新任务管理
A. 锁定分区拓扑结构
在处理数据前,离线数据更新系统向基础架构团队提供的服务请求锁定ClickHouse的分区拓扑结构,在此期间该分区的拓扑结构不会改变。服务端根据预先定义好的数据表结构与分区信息返回数据的分片逻辑与分片ID。离线数据更新系统根据拓扑信息提交Spark任务。多张表的数据处理通过Spark并行完成,显著提升了数据更新的速度。
B. 数据聚合与分片
对于每一张需要更新的表,启动一个Spark任务对数据进行聚合与分片。根据ClickHouse服务端返回的表结构与分片拓扑将数据写入Hadoop,同时输出数据替换阶段用于校验一致性的checksum与分片行数。系统通过Livy Server API提交并轮询任务状态,在有任务失败的情况下进行重试,以排除Spark集群资源不足导致的任务失败。离线数据更新不但要满足每天的批量数据更新需求,还需要支持过往数据的再次更新,以便同步上游数据在日常定时任务更新之外的数据变动。
我们利用平台团队封装的Spring Batch管理更新任务,按照日期将每天的数据划分为一个子任务。通过Spring Batch实现的Continuously Job保证在同一时刻子任务在运行的唯一性,避免产生任务竞争问题。对于过往数据的更新,我们将Batch任务分类,除了日常任务之外,还可以手动触发给定时间范围内的数据修正任务(如下图)。
C. 数据替换
在子任务中的所有Spark Job完成后,离线数据更新系统会调用基础架构团队提供的数据替换接口,发起数据替换请求。服务端按照定义好的分区,将数据从Hadoop直接写入ClickHouse,如图3所示。
离线数据更新系统的架构如图4所示。MySQL数据库用于记录数据替换过程中任务的状态与优先级,当Spark Job失败或者由于其他原因导致替换任务失败重启后,恢复任务的进度。
5) 原子性与一致性
为了保证数据替换的原子性,基础架构团队提供了分区替换的方式。在离线数据导入的过程中,首先创建目标分区的临时分区。当数据替换完毕并且校验完成之后,目标分区会被临时分区替换。针对不同机器上不同分片的原子性替换问题,基础架构团队为每一条数据引入了数据版本。对于每一个数据分区,都有对应的活跃版本号。直到待替换数据分区的所有分片都成功导入之后,分区的版本号进行更新。上游应用的同一条SQL只能读取同一分区一个版本的数据,每个分区的数据替换只感觉到一次切换,并不会出现同时读取新旧数据的问题。
广告平台报表生成应用因此在SQL层面引入了相应的修改,通过引入固定的WITH和PREWHERE语句,在字典中查询出每个数据分区对应的版本号,并在查询计划中排除掉不需要的数据分区。
为了确保数据替换的一致性,在完成Spark数据处理之后,离线数据更新系统会计算各数据分片的校验码与数据总量。当替换完毕之后,ClickHouse服务端会对分片数据进行校验,确保在数据搬迁过程中没有数据丢失和重复。
ClickHouse支持SQL查询(不完全),有HTTP和TCP两种连接方式,官方和第三方的查询工具和库丰富。用户可以使用命令行,JDBC或者可视化工具快速进行数据查询的开发和调试。ClickHouse通过MPP(Massively Parallel Processing) + SMP(Symmetric Multiprocessing)充分地利用机器资源,单条查询语句默认使用机器核数一半的CPU,因此ClickHouse不支持高并发的应用场景。在业务使用层面,最核心的问题是查询校验和并发控制,单条过大的查询或者过高的并发都会导致集群资源使用率过高,影响集群稳定性。
Ebay Seller Hub通过Reports Service接入ClickHouse查询,Reports Service提供了Public和Internal两套API。Internal API提供给Seller Hub以及其他内部的已知应用使用,Public API在eBay Developers Program(详情见:https://developer.ebay.com/)开放给第三方开发者。
Internal API的查询直接提交内部线程池执行,线程池的大小根据ClickHouse的集群机器数量设置。查询请求执行前会进行校验,过滤所有非法以及资源不可预估的请求。
Public API通过任务提交的方式异步执行查询,用户提交的查询任务存入DB中,Service内部的Schedule定时扫表,根据任务的状态串行执行查询任务。执行成功的任务上传生成Report到文件服务器,用户拿到URL后自行下载。执行失败的任务,根据错误类型(非法的请求,资源不足等)来选择是否在下一个周期再次执行。
在生产环境部署完成后,我们开启了数据双写,往ClickHouse里不断地插入实时数据和离线数据,直到达到Druid的数据水平。在数据一致性验证过后,我们镜像了一份生产服务的查询,然后把这些查询转发给ClickHouse。通过收集和对比Druid和ClickHouse的响应,我们能够验证ClickHouse链路的数据质量和查询性能。之后的灰度阶段,我们逐渐提升ClickHouse服务生产系统的比例,并保持Druid继续运行,以保证出现问题可以及时回滚。
数据可视化方面,我们需要提供类似Turnilo的可视化工具给开发、测试和BI人员使用。ClickHouse支持多种商业和开源的产品接入,我们选用了Cube.JS,并进行了简单的二次开发。
本文介绍了广告数据平台的基本情况,ClickHouse/Druid的特点对比和团队使用ClickHouse替换Druid的架构方案。ClickHouse表现出了良好的性能和扩展能力,并且还在快速的迭代更新。目前项目已经上线,接下来我们还会和大家继续分享过程中的碰到的一些问题和解决方法,欢迎大家持续关注。
如果字段的最大可能长度超过255字节,那么长度值可能…
只能说作者太用心了,优秀
感谢详解
一般干个7-8年(即30岁左右),能做到年入40w-50w;有…
230721