为了让你搭好数仓(离线+实时),不小心又肝3w字……

园陌 2022-03-16 10:23:35

文档大纲

 

图片

 

本文上半部分之前已经发过了,传送门:数仓建设保姆级5W字教程,离线实时一网打尽(理论+实战)

 

此篇文章是整个文档的下半部分,将接着上半部分从第五章开始。

 

五、实时数仓建设核心

 

 

1、实时计算初期

 

虽然实时计算在最近几年才火起来,但是在早期也有部分公司有实时计算的需求,但是数据量比较少,所以在实时方面形成不了完整的体系,基本所有的开发都是具体问题具体分析,来一个需求做一个,基本不考虑它们之间的关系,开发形式如下:

 

图片

早期实时计算

 

如上图所示,拿到数据源后,会经过数据清洗,扩维,通过Flink进行业务逻辑处理,最后直接进行业务输出。把这个环节拆开来看,数据源端会重复引用相同的数据源,后面进行清洗、过滤、扩维等操作,都要重复做一遍,唯一不同的是业务的代码逻辑是不一样的。

 

随着产品和业务人员对实时数据需求的不断增多,这种开发模式出现的问题越来越多:

 

  • 数据指标越来越多,“烟囱式”的开发导致代码耦合问题严重。

 

  • 需求越来越多,有的需要明细数据,有的需要 OLAP 分析。单一的开发模式难以应付多种需求。

 

  • 每个需求都要申请资源,导致资源成本急速膨胀,资源不能集约有效利用。

 

  • 缺少完善的监控系统,无法在对业务产生影响之前发现并修复问题。

 

大家看实时数仓的发展和出现的问题,和离线数仓非常类似,后期数据量大了之后产生了各种问题,离线数仓当时是怎么解决的?离线数仓通过分层架构使数据解耦,多个业务可以共用数据,实时数仓是否也可以用分层架构呢?当然是可以的,但是细节上和离线的分层还是有一些不同,稍后会讲到。

 

 

2、实时数仓建设

 

从方法论来讲,实时和离线是非常相似的,离线数仓早期的时候也是具体问题具体分析,当数据规模涨到一定量的时候才会考虑如何治理。分层是一种非常有效的数据治理方式,所以在实时数仓如何进行管理的问题上,首先考虑的也是分层的处理逻辑。

 

实时数仓的架构如下图:

 

图片

 

从上图中我们具体分析下每层的作用:

 

  • 数据源:在数据源的层面,离线和实时在数据源是一致的,主要分为日志类和业务类,日志类又包括用户日志,埋点日志以及服务器日志等。

 

  • 实时明细层:在明细层,为了解决重复建设的问题,要进行统一构建,利用离线数仓的模式,建设统一的基础明细数据层,按照主题进行管理,明细层的目的是给下游提供直接可用的数据,因此要对基础层进行统一的加工,比如清洗、过滤、扩维等。

 

  • 汇总层:汇总层通过Flink的简洁算子直接可以算出结果,并且形成汇总指标池,所有的指标都统一在汇总层加工,所有人按照统一的规范管理建设,形成可复用的汇总结果。

 

我们可以看出,实时数仓和离线数仓的分层非常类似,比如 数据源层,明细层,汇总层,乃至应用层,他们命名的模式可能都是一样的。但仔细比较不难发现,两者有很多区别:

 

  • 与离线数仓相比,实时数仓的层次更少一些:

 

  • 从目前建设离线数仓的经验来看,数仓的数据明细层内容会非常丰富,处理明细数据外一般还会包含轻度汇总层的概念,另外离线数仓中应用层数据在数仓内部,但实时数仓中,app 应用层数据已经落入应用系统的存储介质中,可以把该层与数仓的表分离。

 

  • 应用层少建设的好处:实时处理数据的时候,每建一个层次,数据必然会产生一定的延迟。

 

  • 汇总层少建的好处:在汇总统计的时候,往往为了容忍一部分数据的延迟,可能会人为的制造一些延迟来保证数据的准确。举例,在统计跨天相关的订单事件中的数据时,可能会等到 00:00:05 或者 00:00:10 再统计,确保 00:00 前的数据已经全部接受到位了,再进行统计。所以,汇总层的层次太多的话,就会更大的加重人为造成的数据延迟。

 

  • 与离线数仓相比,实时数仓的数据源存储不同:

 

  • 在建设离线数仓的时候,基本整个离线数仓都是建立在 Hive 表之上。但是,在建设实时数仓的时候,同一份表,会使用不同的方式进行存储。比如常见的情况下,明细数据或者汇总数据都会存在 Kafka 里面,但是像城市、渠道等维度信息需要借助 Hbase,MySQL 或者其他 KV 存储等数据库来进行存储。

 

 

3、Lambda架构的实时数仓

 

下图是基于 Flink 和 Kafka 的 Lambda 架构的具体实践,上层是实时计算,下层是离线计算,横向是按计算引擎来分,纵向是按实时数仓来区分:

 

图片

 

Lambda架构是比较经典的架构,以前实时的场景不是很多,以离线为主,当附加了实时场景后,由于离线和实时的时效性不同,导致技术生态是不一样的。Lambda架构相当于附加了一条实时生产链路,在应用层面进行一个整合,双路生产,各自独立。这在业务应用中也是顺理成章采用的一种方式。

 

双路生产会存在一些问题,比如加工逻辑double,开发运维也会double,资源同样会变成两个资源链路。因为存在以上问题,所以又演进了一个Kappa架构。

 

 

4、Kappa架构的实时数仓

 

Kappa架构相当于去掉了离线计算部分的Lambda架构,具体如下图所示:

 

图片

 

Kappa架构从架构设计来讲比较简单,生产统一,一套逻辑同时生产离线和实时。但是在实际应用场景有比较大的局限性,因为实时数据的同一份表,会使用不同的方式进行存储,这就导致关联时需要跨数据源,操作数据有很大局限性,所以在业内直接用Kappa架构生产落地的案例不多见,且场景比较单一。

 

关于 Kappa 架构,熟悉实时数仓生产的同学,可能会有一个疑问。因为我们经常会面临业务变更,所以很多业务逻辑是需要去迭代的。之前产出的一些数据,如果口径变更了,就需要重算,甚至重刷历史数据。对于实时数仓来说,怎么去解决数据重算问题?

 

Kappa 架构在这一块的思路是:首先要准备好一个能够存储历史数据的消息队列,比如 Kafka,并且这个消息队列是可以支持你从某个历史的节点重新开始消费的。接着需要新起一个任务,从原来比较早的一个时间节点去消费 Kafka 上的数据,然后当这个新的任务运行的进度已经能够和现在的正在跑的任务齐平的时候,你就可以把现在任务的下游切换到新的任务上面,旧的任务就可以停掉,并且原来产出的结果表也可以被删掉。

 

 

5、流批结合的实时数仓

 

随着实时 OLAP 技术的发展,目前开源的OLAP引擎在性能,易用等方面有了很大的提升,如Doris、Presto等,加上数据湖技术的迅速发展,使得流批结合的方式变得简单。

 

如下图是流批结合的实时数仓:

 

图片

 

数据从日志统一采集到消息队列,再到实时数仓,作为基础数据流的建设是统一的。之后对于日志类实时特征,实时大屏类应用走实时流计算。对于Binlog类业务分析走实时OLAP批处理。

 

我们看到流批结合的方式与上面几种架构的存储方式发生了变化,由Kafka换成了Iceberg,Iceberg是介于上层计算引擎和底层存储格式之间的一个中间层,我们可以把它定义成一种“数据组织格式”,底层存储还是HDFS,那么为什么加了中间层,就对流批结合处理的比较好了呢?Iceberg的ACID能力可以简化整个流水线的设计,降低整个流水线的延迟,并且所具有的修改、删除能力能够有效地降低开销,提升效率。Iceberg可以有效支持批处理的高吞吐数据扫描和流计算按分区粒度并发实时处理。

 

六、基于Flink SQL从0到1构建一个实时数仓

 

本小节内容来自大数据技术与数仓

 

实时数仓主要解决传统数仓数据时效性低的问题,实时数仓通常会用在实时的OLAP分析,实时大屏展示,实时监控报警各个场景。虽然关于实时数仓架构及技术选型与传统的离线数仓会存在差异,但是关于数仓建设的基本方法论是一致的。接下来主要介绍Flink SQL从0到1搭建一个实时数仓的demo,涉及到数据采集、存储、计算、可视化整个流程。

 

 

1、案例简介

 

本文以电商业务为例,展示实时数仓的数据处理流程。另外,本文旨在说明实时数仓的构建流程,所以不会涉及复杂的数据计算。为了保证案例的可操作性和完整性,本文会给出详细的操作步骤。为了方便演示,本文的所有操作都是在Flink SQL Cli中完成。

 

 

2、架构设计

 

具体的架构设计如图所示:首先通过canal解析MySQL的binlog日志,将数据存储在Kafka中。然后使用Flink SQL对原始数据进行清洗关联,并将处理之后的明细宽表写入Kafka中。维表数据存储在MySQL中,通过Flink SQL对明细宽表与维表进行join,将聚合后的数据写入MySQL,最后通过FineBI进行可视化展示。

 

图片

 

 

3、业务数据准备

 

1)订单表(order_info)

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
CREATE TABLE `order_info` (  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',  `consignee` varchar(100) DEFAULT NULL COMMENT '收货人',  `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',  `total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',  `order_status` varchar(20) DEFAULT NULL COMMENT '订单状态',  `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',  `payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式',  `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',  `order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',  `out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方支付用)',  `trade_body` varchar(200) DEFAULT NULL COMMENT '订单描述(第三方支付用)',  `create_time` datetime DEFAULT NULL COMMENT '创建时间',  `operate_time` datetime DEFAULT NULL COMMENT '操作时间',  `expire_time` datetime DEFAULT NULL COMMENT '失效时间',  `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',  `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',  `img_url` varchar(200) DEFAULT NULL COMMENT '图片路径',  `province_id` int(20) DEFAULT NULL COMMENT '地区',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';

 

2)订单详情表(order_detail)

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
CREATE TABLE `order_detail` (  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',  `order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',  `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',  `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称(冗余)',  `img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)',  `order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',  `sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',  `create_time` datetime DEFAULT NULL COMMENT '创建时间',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单详情表';

 

3)商品表(sku_info)

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
CREATE TABLE `sku_info` (  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'skuid(itemID)',  `spu_id` bigint(20) DEFAULT NULL COMMENT 'spuid',  `price` decimal(10,0) DEFAULT NULL COMMENT '价格',  `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称',  `sku_desc` varchar(2000) DEFAULT NULL COMMENT '商品规格描述',  `weight` decimal(10,2) DEFAULT NULL COMMENT '重量',  `tm_id` bigint(20) DEFAULT NULL COMMENT '品牌(冗余)',  `category3_id` bigint(20) DEFAULT NULL COMMENT '三级分类id(冗余)',  `sku_default_img` varchar(200) DEFAULT NULL COMMENT '默认显示图片(冗余)',  `create_time` datetime DEFAULT NULL COMMENT '创建时间',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='商品表';

 

4)商品一级类目表(base_category1)

 

  •  
  •  
  •  
  •  
  •  
CREATE TABLE `base_category1` (  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',  `name` varchar(10) NOT NULL COMMENT '分类名称',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='一级分类表';

 

5)商品二级类目表(base_category2)

 

  •  
  •  
  •  
  •  
  •  
  •  
CREATE TABLE `base_category2` (  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',  `name` varchar(200) NOT NULL COMMENT '二级分类名称',  `category1_id` bigint(20) DEFAULT NULL COMMENT '一级分类编号',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='二级分类表';

 

6)商品三级类目表(base_category3)

 

  •  
  •  
  •  
  •  
  •  
  •  
CREATE TABLE `base_category3` (  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',  `name` varchar(200) NOT NULL COMMENT '三级分类名称',  `category2_id` bigint(20) DEFAULT NULL COMMENT '二级分类编号',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='三级分类表';

 

7)省份表(区域表(base_region)base_province)

 

  •  
  •  
  •  
  •  
  •  
  •  
CREATE TABLE `base_province` (  `id` int(20) DEFAULT NULL COMMENT 'id',  `name` varchar(20) DEFAULT NULL COMMENT '省名称',  `region_id` int(20) DEFAULT NULL COMMENT '大区id',  `area_code` varchar(20) DEFAULT NULL COMMENT '行政区位码') ENGINE=InnoDB DEFAULT CHARSET=utf8;

 

8)区域表(base_region)

 

  •  
  •  
  •  
  •  
  •  
CREATE TABLE `base_region` (  `id` int(20) NOT NULL COMMENT '大区id',  `region_name` varchar(20) DEFAULT NULL COMMENT '大区名称',   PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8

 

 

4、数据处理流程

 

1)ods层数据同步

 

关于ODS层的数据同步这里就不详细展开。主要使用canal解析MySQL的binlog日志,然后将其写入到Kafka对应的topic中。由于篇幅限制,不会对具体的细节进行说明。同步之后的结果如下图所示:

 

图片

 

2)DIM层数据准备

 

本案例中将维表存储在了MySQL中,实际生产中会用HBase存储维表数据。我们主要用到两张维表:区域维表和商品维表。处理过程如下:

 

  • 区域维表

 

首先将mydw.base_provincemydw.base_region这个主题对应的数据抽取到MySQL中,主要使用Flink SQL的Kafka数据源对应的canal-json格式,注意:在执行装载之前,需要先在MySQL中创建对应的表,本文使用的MySQL数据库的名字为dim,用于存放维表数据。如下:

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
-- ---------------------------   省份--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_province`;CREATE TABLE `ods_base_province` (  `id` INT,  `name` STRING,  `region_id` INT ,  `area_code`STRING) WITH('connector' = 'kafka', 'topic' = 'mydw.base_province', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ;  -- ---------------------------   省份--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_province`;CREATE TABLE `base_province` (    `id` INT,    `name` STRING,    `region_id` INT ,    `area_code`STRING,    PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_province', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s'); -- ---------------------------   省份--   MySQL Sink Load Data-- ------------------------- INSERT INTO base_provinceSELECT *FROM ods_base_province; -- ---------------------------   区域--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_region`;CREATE TABLE `ods_base_region` (  `id` INT,  `region_name` STRING) WITH('connector' = 'kafka', 'topic' = 'mydw.base_region', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ;  -- ---------------------------   区域--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_region`;CREATE TABLE `base_region` (    `id` INT,    `region_name` STRING,     PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_region', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s'); -- ---------------------------   区域--   MySQL Sink Load Data-- ------------------------- INSERT INTO base_regionSELECT *FROM ods_base_region;经过上面的步骤,将创建维表所需要的原始数据已经存储到了MySQL中,接下来就需要在MySQL中创建维表,我们使用上面的两张表,创建一张视图:dim_province作为维表:-- ----------------------------------- DIM层,区域维表,-- 在MySQL中创建视图-- ---------------------------------DROP VIEW IF EXISTS dim_province;CREATE VIEW dim_province ASSELECT  bp.id AS province_id,  bp.name AS province_name,  br.id AS region_id,  br.region_name AS region_name,  bp.area_code AS area_codeFROM base_region br      JOIN base_province bp ON br.id= bp.region_id;这样我们所需要的维表:dim_province就创建好了,只需要在维表join时,使用Flink SQL创建JDBC的数据源,就可以使用该维表了。同理,我们使用相同的方法创建商品维表,具体如下:-- ---------------------------  一级类目表--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_category1`;CREATE TABLE `ods_base_category1` (  `id` BIGINT,  `name` STRING)WITH( 'connector' = 'kafka', 'topic' = 'mydw.base_category1', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ---------------------------  一级类目表--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_category1`;CREATE TABLE `base_category1` (    `id` BIGINT,    `name` STRING,     PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_category1', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s'); -- ---------------------------  一级类目表--   MySQL Sink Load Data-- -------------------------  INSERT INTO base_category1SELECT *FROM ods_base_category1; -- ---------------------------  二级类目表--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_category2`;CREATE TABLE `ods_base_category2` (  `id` BIGINT,  `name` STRING,  `category1_id` BIGINT)WITH('connector' = 'kafka', 'topic' = 'mydw.base_category2', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ---------------------------  二级类目表--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_category2`;CREATE TABLE `base_category2` (    `id` BIGINT,    `name` STRING,    `category1_id` BIGINT,     PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_category2', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s'); -- ---------------------------  二级类目表--   MySQL Sink Load Data-- ------------------------- INSERT INTO base_category2SELECT *FROM ods_base_category2; -- --------------------------- 三级类目表--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_category3`;CREATE TABLE `ods_base_category3` (  `id` BIGINT,  `name` STRING,  `category2_id` BIGINT)WITH('connector' = 'kafka', 'topic' = 'mydw.base_category3', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ;  -- ---------------------------  三级类目表--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_category3`;CREATE TABLE `base_category3` (    `id` BIGINT,    `name` STRING,    `category2_id` BIGINT,    PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_category3', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s'); -- ---------------------------  三级类目表--   MySQL Sink Load Data-- ------------------------- INSERT INTO base_category3SELECT *FROM ods_base_category3; -- ---------------------------   商品表--   Kafka Source-- -------------------------  DROP TABLE IF EXISTS `ods_sku_info`;CREATE TABLE `ods_sku_info` (  `id` BIGINT,  `spu_id` BIGINT,  `price` DECIMAL(10,0),  `sku_name` STRING,  `sku_desc` STRING,  `weight` DECIMAL(10,2),  `tm_id` BIGINT,  `category3_id` BIGINT,  `sku_default_img` STRING,  `create_time` TIMESTAMP(0)) WITH( 'connector' = 'kafka', 'topic' = 'mydw.sku_info', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ;  -- ---------------------------   商品表--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `sku_info`;CREATE TABLE `sku_info` (  `id` BIGINT,  `spu_id` BIGINT,  `price` DECIMAL(10,0),  `sku_name` STRING,  `sku_desc` STRING,  `weight` DECIMAL(10,2),  `tm_id` BIGINT,  `category3_id` BIGINT,  `sku_default_img` STRING,  `create_time` TIMESTAMP(0),   PRIMARY KEY (tm_id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'sku_info', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s'); -- ---------------------------   商品--   MySQL Sink Load Data-- ------------------------- INSERT INTO sku_infoSELECT *FROM ods_sku_info;

 

经过上面的步骤,将创建维表所需要的原始数据已经存储到了MySQL中,接下来就需要在MySQL中创建维表,我们使用上面的两张表,创建一张视图:dim_province作为维表:

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
-- ----------------------------------- DIM层,区域维表,-- 在MySQL中创建视图-- ---------------------------------DROP VIEW IF EXISTS dim_province;CREATE VIEW dim_province ASSELECT  bp.id AS province_id,  bp.name AS province_name,  br.id AS region_id,  br.region_name AS region_name,  bp.area_code AS area_codeFROM base_region br      JOIN base_province bp ON br.id= bp.region_id;

 

这样我们所需要的维表:dim_province就创建好了,只需要在维表join时,使用Flink SQL创建JDBC的数据源,就可以使用该维表了。同理,我们使用相同的方法创建商品维表,具体如下:

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
-- ---------------------------  一级类目表--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_category1`;CREATE TABLE `ods_base_category1` (  `id` BIGINT,  `name` STRING)WITH( 'connector' = 'kafka', 'topic' = 'mydw.base_category1', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ---------------------------  一级类目表--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_category1`;CREATE TABLE `base_category1` (    `id` BIGINT,    `name` STRING,     PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_category1', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s'); -- ---------------------------  一级类目表--   MySQL Sink Load Data-- -------------------------  INSERT INTO base_category1SELECT *FROM ods_base_category1; -- ---------------------------  二级类目表--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_category2`;CREATE TABLE `ods_base_category2` (  `id` BIGINT,  `name` STRING,  `category1_id` BIGINT)WITH('connector' = 'kafka', 'topic' = 'mydw.base_category2', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ---------------------------  二级类目表--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_category2`;CREATE TABLE `base_category2` (    `id` BIGINT,    `name` STRING,    `category1_id` BIGINT,     PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_category2', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s'); -- ---------------------------  二级类目表--   MySQL Sink Load Data-- ------------------------- INSERT INTO base_category2SELECT *FROM ods_base_category2; -- --------------------------- 三级类目表--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_category3`;CREATE TABLE `ods_base_category3` (  `id` BIGINT,  `name` STRING,  `category2_id` BIGINT)WITH('connector' = 'kafka', 'topic' = 'mydw.base_category3', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ;  -- ---------------------------  三级类目表--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_category3`;CREATE TABLE `base_category3` (    `id` BIGINT,    `name` STRING,    `category2_id` BIGINT,    PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_category3', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s'); -- ---------------------------  三级类目表--   MySQL Sink Load Data-- ------------------------- INSERT INTO base_category3SELECT *FROM ods_base_category3; -- ---------------------------   商品表--   Kafka Source-- -------------------------  DROP TABLE IF EXISTS `ods_sku_info`;CREATE TABLE `ods_sku_info` (  `id` BIGINT,  `spu_id` BIGINT,  `price` DECIMAL(10,0),  `sku_name` STRING,  `sku_desc` STRING,  `weight` DECIMAL(10,2),  `tm_id` BIGINT,  `category3_id` BIGINT,  `sku_default_img` STRING,  `create_time` TIMESTAMP(0)) WITH( 'connector' = 'kafka', 'topic' = 'mydw.sku_info', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ;  -- ---------------------------   商品表--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `sku_info`;CREATE TABLE `sku_info` (  `id` BIGINT,  `spu_id` BIGINT,  `price` DECIMAL(10,0),  `sku_name` STRING,  `sku_desc` STRING,  `weight` DECIMAL(10,2),  `tm_id` BIGINT,  `category3_id` BIGINT,  `sku_default_img` STRING,  `create_time` TIMESTAMP(0),   PRIMARY KEY (tm_id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'sku_info', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s'); -- ---------------------------   商品--   MySQL Sink Load Data-- ------------------------- INSERT INTO sku_infoSELECT *FROM ods_sku_info;

 

经过上面的步骤,我们可以将创建商品维表的基础数据表同步到MySQL中,同样需要提前创建好对应的数据表。接下来我们使用上面的基础表在mySQL的dim库中创建一张视图:dim_sku_info,用作后续使用的维表。

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
-- ----------------------------------- DIM层,商品维表,-- 在MySQL中创建视图-- ---------------------------------CREATE VIEW dim_sku_info ASSELECT  si.id AS id,  si.sku_name AS sku_name,  si.category3_id AS c3_id,  si.weight AS weight,  si.tm_id AS tm_id,  si.price AS price,  si.spu_id AS spu_id,  c3.name AS c3_name,  c2.id AS c2_id,  c2.name AS c2_name,  c3.id AS c1_id,  c3.name AS c1_nameFROM(  sku_info si   JOIN base_category3 c3 ON si.category3_id = c3.id  JOIN base_category2 c2 ON c3.category2_id =c2.id  JOIN base_category1 c1 ON c2.category1_id = c1.id);

 

至此,我们所需要的维表数据已经准备好了,接下来开始处理DWD层的数据。

 

3)DWD层数据处理

 

经过上面的步骤,我们已经将所用的维表已经准备好了。接下来我们将对ODS的原始数据进行处理,加工成DWD层的明细宽表。具体过程如下:

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
-- ---------------------------   订单详情--   Kafka Source-- -------------------------  DROP TABLE IF EXISTS `ods_order_detail`;CREATE TABLE `ods_order_detail`(  `id` BIGINT,  `order_id` BIGINT,  `sku_id` BIGINT,  `sku_name` STRING,  `img_url` STRING,  `order_price` DECIMAL(10,2),  `sku_num` INT,  `create_time` TIMESTAMP(0)) WITH( 'connector' = 'kafka', 'topic' = 'mydw.order_detail', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ;  -- ---------------------------   订单信息--   Kafka Source-- -------------------------DROP TABLE IF EXISTS `ods_order_info`;CREATE TABLE `ods_order_info` (  `id` BIGINT,  `consignee` STRING,  `consignee_tel` STRING,  `total_amount` DECIMAL(10,2),  `order_status` STRING,  `user_id` BIGINT,  `payment_way` STRING,  `delivery_address` STRING,  `order_comment` STRING,  `out_trade_no` STRING,  `trade_body` STRING,  `create_time` TIMESTAMP(0) ,  `operate_time` TIMESTAMP(0) ,  `expire_time` TIMESTAMP(0) ,  `tracking_no` STRING,  `parent_order_id` BIGINT,  `img_url` STRING,  `province_id` INT) WITH('connector' = 'kafka', 'topic' = 'mydw.order_info', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ;  -- ----------------------------------- DWD层,支付订单明细表dwd_paid_order_detail-- ---------------------------------DROP TABLE IF EXISTS dwd_paid_order_detail;CREATE TABLE dwd_paid_order_detail(  detail_id BIGINT,  order_id BIGINT,  user_id BIGINT,  province_id INT,  sku_id BIGINT,  sku_name STRING,  sku_num INT,  order_price DECIMAL(10,0),  create_time STRING,  pay_time STRING ) WITH (    'connector' = 'kafka',    'topic' = 'dwd_paid_order_detail',    'scan.startup.mode' = 'earliest-offset',    'properties.bootstrap.servers' = 'kms-3:9092',    'format' = 'changelog-json');-- ----------------------------------- DWD层,已支付订单明细表-- 向dwd_paid_order_detail装载数据-- ---------------------------------INSERT INTO dwd_paid_order_detailSELECT  od.id,  oi.id order_id,  oi.user_id,  oi.province_id,  od.sku_id,  od.sku_name,  od.sku_num,  od.order_price,  oi.create_time,  oi.operate_timeFROM    (    SELECT *     FROM ods_order_info    WHERE order_status = '2' -- 已支付    ) oi JOIN    (    SELECT *    FROM ods_order_detail    ) od     ON oi.id = od.order_id;

 

4)ADS层数据

 

经过上面的步骤,我们创建了一张dwd_paid_order_detail明细宽表,并将该表存储在了Kafka中。接下来我们将使用这张明细宽表与维表进行JOIN,得到我们ADS应用层数据。

 

  • ads_province_index

 

首先在MySQL中创建对应的ADS目标表:ads_province_index

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
CREATE TABLE ads.ads_province_index(  province_id INT(10),  area_code VARCHAR(100),  province_name VARCHAR(100),  region_id INT(10),  region_name VARCHAR(100),  order_amount DECIMAL(10,2),  order_count BIGINT(10),  dt VARCHAR(100),  PRIMARY KEY (province_id, dt) ) ;

 

向MySQL的ADS层目标装载数据:

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
-- Flink SQL Cli操作-- ----------------------------------- 使用 DDL创建MySQL中的ADS层表-- 指标:1.每天每个省份的订单数--      2.每天每个省份的订单金额-- ---------------------------------CREATE TABLE ads_province_index(  province_id INT,  area_code STRING,  province_name STRING,  region_id INT,  region_name STRING,  order_amount DECIMAL(10,2),  order_count BIGINT,  dt STRING,  PRIMARY KEY (province_id, dt) NOT ENFORCED  ) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/ads',    'table-name' = 'ads_province_index',     'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe');-- ----------------------------------- dwd_paid_order_detail已支付订单明细宽表-- ---------------------------------CREATE TABLE dwd_paid_order_detail(  detail_id BIGINT,  order_id BIGINT,  user_id BIGINT,  province_id INT,  sku_id BIGINT,  sku_name STRING,  sku_num INT,  order_price DECIMAL(10,2),  create_time STRING,  pay_time STRING ) WITH (    'connector' = 'kafka',    'topic' = 'dwd_paid_order_detail',    'scan.startup.mode' = 'earliest-offset',    'properties.bootstrap.servers' = 'kms-3:9092',    'format' = 'changelog-json'); -- ----------------------------------- tmp_province_index-- 订单汇总临时表-- ---------------------------------CREATE TABLE tmp_province_index(    province_id INT,    order_count BIGINT,-- 订单数    order_amount DECIMAL(10,2), -- 订单金额    pay_date DATE)WITH (    'connector' = 'kafka',    'topic' = 'tmp_province_index',    'scan.startup.mode' = 'earliest-offset',    'properties.bootstrap.servers' = 'kms-3:9092',    'format' = 'changelog-json');-- ----------------------------------- tmp_province_index-- 订单汇总临时表数据装载-- ---------------------------------INSERT INTO tmp_province_indexSELECT      province_id,      count(distinct order_id) order_count,-- 订单数      sum(order_price * sku_num) order_amount, -- 订单金额      TO_DATE(pay_time,'yyyy-MM-dd') pay_dateFROM dwd_paid_order_detailGROUP BY province_id,TO_DATE(pay_time,'yyyy-MM-dd');-- ----------------------------------- tmp_province_index_source-- 使用该临时汇总表,作为数据源-- ---------------------------------CREATE TABLE tmp_province_index_source(    province_id INT,    order_count BIGINT,-- 订单数    order_amount DECIMAL(10,2), -- 订单金额    pay_date DATE,    proctime as PROCTIME()   -- 通过计算列产生一个处理时间列 ) WITH (    'connector' = 'kafka',    'topic' = 'tmp_province_index',    'scan.startup.mode' = 'earliest-offset',    'properties.bootstrap.servers' = 'kms-3:9092',    'format' = 'changelog-json'); -- ----------------------------------- DIM层,区域维表,-- 创建区域维表数据源-- ---------------------------------DROP TABLE IF EXISTS `dim_province`;CREATE TABLE dim_province (  province_id INT,  province_name STRING,  area_code STRING,  region_id INT,  region_name STRING ,  PRIMARY KEY (province_id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'dim_province',     'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'scan.fetch-size' = '100'); -- ----------------------------------- 向ads_province_index装载数据-- 维表JOIN-- --------------------------------- INSERT INTO ads_province_indexSELECT  pc.province_id,  dp.area_code,  dp.province_name,  dp.region_id,  dp.region_name,  pc.order_amount,  pc.order_count,  cast(pc.pay_date as VARCHAR)FROMtmp_province_index_source pc  JOIN dim_province FOR SYSTEM_TIME AS OF pc.proctime as dp   ON dp.province_id = pc.province_id;

 

当提交任务之后:观察Flink WEB UI:

 

图片

 

查看ADS层的ads_province_index表数据:

 

图片

 

  • ads_sku_index

 

首先在MySQL中创建对应的ADS目标表:ads_sku_index

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
CREATE TABLE ads_sku_index(  sku_id BIGINT(10),  sku_name VARCHAR(100),  weight DOUBLE,  tm_id BIGINT(10),  price DOUBLE,  spu_id BIGINT(10),  c3_id BIGINT(10),  c3_name VARCHAR(100) ,  c2_id BIGINT(10),  c2_name VARCHAR(100),  c1_id BIGINT(10),  c1_name VARCHAR(100),  order_amount DOUBLE,  order_count BIGINT(10),  sku_count BIGINT(10),  dt varchar(100),  PRIMARY KEY (sku_id,dt));

 

向MySQL的ADS层目标装载数据:

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
-- ----------------------------------- 使用 DDL创建MySQL中的ADS层表-- 指标:1.每天每个商品对应的订单个数--      2.每天每个商品对应的订单金额--      3.每天每个商品对应的数量-- ---------------------------------CREATE TABLE ads_sku_index(  sku_id BIGINT,  sku_name VARCHAR,  weight DOUBLE,  tm_id BIGINT,  price DOUBLE,  spu_id BIGINT,  c3_id BIGINT,  c3_name VARCHAR ,  c2_id BIGINT,  c2_name VARCHAR,  c1_id BIGINT,  c1_name VARCHAR,  order_amount DOUBLE,  order_count BIGINT,  sku_count BIGINT,  dt varchar,  PRIMARY KEY (sku_id,dt) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/ads',    'table-name' = 'ads_sku_index',     'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe'); -- ----------------------------------- dwd_paid_order_detail已支付订单明细宽表-- ---------------------------------CREATE TABLE dwd_paid_order_detail(  detail_id BIGINT,  order_id BIGINT,  user_id BIGINT,  province_id INT,  sku_id BIGINT,  sku_name STRING,  sku_num INT,  order_price DECIMAL(10,2),  create_time STRING,  pay_time STRING ) WITH (    'connector' = 'kafka',    'topic' = 'dwd_paid_order_detail',    'scan.startup.mode' = 'earliest-offset',    'properties.bootstrap.servers' = 'kms-3:9092',    'format' = 'changelog-json'); -- ----------------------------------- tmp_sku_index-- 商品指标统计-- ---------------------------------CREATE TABLE tmp_sku_index(    sku_id BIGINT,    order_count BIGINT,-- 订单数    order_amount DECIMAL(10,2), -- 订单金额 order_sku_num BIGINT,    pay_date DATE)WITH (    'connector' = 'kafka',    'topic' = 'tmp_sku_index',    'scan.startup.mode' = 'earliest-offset',    'properties.bootstrap.servers' = 'kms-3:9092',    'format' = 'changelog-json');-- ----------------------------------- tmp_sku_index-- 数据装载-- ---------------------------------INSERT INTO tmp_sku_indexSELECT      sku_id,      count(distinct order_id) order_count,-- 订单数      sum(order_price * sku_num) order_amount, -- 订单金额   sum(sku_num) order_sku_num,      TO_DATE(pay_time,'yyyy-MM-dd') pay_dateFROM dwd_paid_order_detailGROUP BY sku_id,TO_DATE(pay_time,'yyyy-MM-dd'); -- ----------------------------------- tmp_sku_index_source-- 使用该临时汇总表,作为数据源-- ---------------------------------CREATE TABLE tmp_sku_index_source(    sku_id BIGINT,    order_count BIGINT,-- 订单数    order_amount DECIMAL(10,2), -- 订单金额    order_sku_num BIGINT,    pay_date DATE,    proctime as PROCTIME()   -- 通过计算列产生一个处理时间列 ) WITH (    'connector' = 'kafka',    'topic' = 'tmp_sku_index',    'scan.startup.mode' = 'earliest-offset',    'properties.bootstrap.servers' = 'kms-3:9092',    'format' = 'changelog-json');-- ----------------------------------- DIM层,商品维表,-- 创建商品维表数据源-- ---------------------------------DROP TABLE IF EXISTS `dim_sku_info`;CREATE TABLE dim_sku_info (  id BIGINT,  sku_name STRING,  c3_id BIGINT,  weight DECIMAL(10,2),  tm_id BIGINT,  price DECIMAL(10,2),  spu_id BIGINT,  c3_name STRING,  c2_id BIGINT,  c2_name STRING,  c1_id BIGINT,  c1_name STRING,  PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'dim_sku_info',     'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'scan.fetch-size' = '100');-- ----------------------------------- 向ads_sku_index装载数据-- 维表JOIN-- ---------------------------------INSERT INTO ads_sku_indexSELECT  sku_id ,  sku_name ,  weight ,  tm_id ,  price ,  spu_id ,  c3_id ,  c3_name,  c2_id ,  c2_name ,  c1_id ,  c1_name ,  sc.order_amount,  sc.order_count ,  sc.order_sku_num ,  cast(sc.pay_date as VARCHAR)FROMtmp_sku_index_source sc   JOIN dim_sku_info FOR SYSTEM_TIME AS OF sc.proctime as ds  ON ds.id = sc.sku_id;

 

当提交任务之后:观察Flink WEB UI:

 

图片

 

查看ADS层的ads_sku_index表数据:

 

图片

 

 

5、FineBI展示

 

图片

 

七、数据治理

 

数仓建设真正的难点不在于数仓设计,而在于后续业务发展起来,业务线变的庞大之后的数据治理,包括资产治理、数据质量监控、数据指标体系的建设等。

 

其实数据治理的范围很⼴,包含数据本⾝的管理、数据安全、数据质量、数据成本等。在DAMA 数据管理知识体系指南中,数据治理位于数据管理“车轮图”的正中央,是数据架构、数据建模、数据存储、数据安全、数据质量、元数据管理、主数据管理等10大数据管理领域的总纲,为各项数据管理活动提供总体指导策略。

 

图片

 

 

1、数据治理之道是什么

 

1)数据治理需要体系建设

 

为发挥数据价值需要满足三个要素:合理的平台架构、完善的治理服务、体系化的运营手段。

 

根据企业的规模、所属行业、数据量等情况选择合适的平台架构;治理服务需要贯穿数据全生命周期,保证数据在采集、加工、共享、存储、应用整个过程中的完整性、准确性、一致性和实效性;运营手段则应当包括规范的优化、组织的优化、平台的优化以及流程的优化等等方面。

 

2)数据治理需要夯实基础

 

数据治理需要循序渐进,但在建设初期至少需要关注三个方面:数据规范、数据质量、数据安全。规范化的模型管理是保障数据可以被治理的前提条件,高质量的数据是数据可用的前提条件,数据的安全管控是数据可以共享交换的前提条件。

 

3)数据治理需要IT赋能

 

数据治理不是一堆规范文档的堆砌,而是需要将治理过程中所产生的的规范、流程、标准落地到IT平台上,在数据生产过程中通过“以终为始”前向的方式进行数据治理,避免事后稽核带来各种被动和运维成本的增加

 

4)数据治理需要聚焦数据

 

数据治理的本质是管理数据,因此需要加强元数据管理和主数据管理,从源头治理数据,补齐数据的相关属性和信息,比如:元数据、质量、安全、业务逻辑、血缘等,通过元数据驱动的方式管理数据生产、加工和使用。

 

5)数据治理需要建管一体化

 

数据模型血缘与任务调度的一致性是建管一体化的关键,有助于解决数据管理与数据生产口径不一致的问题,避免出现两张皮的低效管理模式。

 

 

2、浅谈数据治理方式

 

如上面所说,数据治理的范围非常广,其中最重要的是数据质量治理,而数据质量涉及的范围也很广,贯穿数仓的整个生命周期,从数据产生->数据接入->数据存储->数据处理->数据输出->数据展示,每个阶段都需要质量治理,评价维度包括完整性、规范性、一致性、准确性、唯一性、关联性等。

 

在系统建设的各个阶段都应该根据标准进行数据质量检测和规范,及时进行治理,避免事后的清洗工作。

 

质量检测可参考以下维度:

 

图片

 

下面是根据美团的技术文章总结的几点具体治理方式:

 

1)规范治理

 

规范是数仓建设的保障。为了避免出现指标重复建设和数据质量差的情况,统一按照最详细、可落地的方法进行规范建设。

 

① 词根

 

词根是维度和指标管理的基础,划分为普通词根与专有词根,提高词根的易用性和关联性。

 

  • 普通词根:描述事物的最小单元体,如:交易-trade。

 

  • 专有词根:具备约定成俗或行业专属的描述体,如:美元-USD。

 

② 表命名规范

 

通用规范

 

  • 表名、字段名采用一个下划线分隔词根(示例:clienttype->client_type)。

 

  • 每部分使用小写英文单词,属于通用字段的必须满足通用字段信息的定义。

 

  • 表名、字段名需以字母为开头。

 

  • 表名、字段名最长不超过64个英文字符。

 

  • 优先使用词根中已有关键字(数仓标准配置中的词根管理),定期Review新增命名的不合理性。

 

  • 在表名自定义部分禁止采用非标准的缩写。

 

表命名规则

 

  • 表名称 = 类型 + 业务主题 + 子主题 + 表含义 + 存储格式 + 更新频率 +结尾,如下图所示:

 

图片

统一的表命名规范

 

③ 指标命名规范

 

结合指标的特性以及词根管理规范,将指标进行结构化处理。

 

  • 基础指标词根,即所有指标必须包含以下基础词根:

 

图片

 

  • 业务修饰词,用于描述业务场景的词汇,例如trade-交易。

 

  • 日期修饰词,用于修饰业务发生的时间区间。

 

图片

 

  • 聚合修饰词,对结果进行聚集操作。

 

图片

 

  • 基础指标,单一的业务修饰词+基础指标词根构建基础指标 ,例如:交易金额-trade_amt。

 

  • 派生指标,多修饰词+基础指标词根构建派生指标。派生指标继承基础指标的特性,例如:安装门店数量-install_poi_cnt。

 

  • 普通指标命名规范,与字段命名规范一致,由词汇转换即可以。图片

 

图片

 

2)架构治理

 

① 数据分层

 

优秀可靠的数仓体系,往往需要清晰的数据分层结构,即要保证数据层的稳定又要屏蔽对下游的影响,并且要避免链路过长,一般的分层架构如下:图片

 

图片

 

② 数据流向

 

稳定业务按照标准的数据流向进行开发,即ODS-->DWD-->DWA-->APP。非稳定业务或探索性需求,可以遵循ODS->DWD->APP或者ODS->DWD->DWT->APP两个模型数据流。在保障了数据链路的合理性之后,又在此基础上确认了模型分层引用原则:

 

  • 正常流向:ODS>DWD->DWT->DWA->APP,当出现ODS >DWD->DWA->APP这种关系时,说明主题域未覆盖全。应将DWD数据落到DWT中,对于使用频度非常低的表允许DWD->DWA。

 

  • 尽量避免出现DWA宽表中使用DWD又使用(该DWD所归属主题域)DWT的表。

 

  • 同一主题域内对于DWT生成DWT的表,原则上要尽量避免,否则会影响ETL的效率。

 

  • DWT、DWA和APP中禁止直接使用ODS的表, ODS的表只能被DWD引用。

 

  • 禁止出现反向依赖,例如DWT的表依赖DWA的表。

 

3)元数据治理

 

元数据可分为技术元数据和业务元数据:

 

技术元数据为开发和管理数据仓库的IT 人员使用,它描述了与数据仓库开发、管理和维护相关的数据,包括数据源信息、数据转换描述、数据仓库模型、数据清洗与更新规则、数据映射和访问权限等。

 

常见的技术元数据有:

 

  • 存储元数据:如表、字段、分区等信息。

 

  • 运行元数据:如大数据平台上所有作业运行等信息:类似于 Hive Job 日志,包括作业类型、实例名称、输入输出、 SQL 、运行参数、执行时间,执行引擎等。

 

  • 数据开发平台中数据同步、计算任务、任务调度等信息:包括数据同步的输入输出表和字段,以及同步任务本身的节点信息:计算任务主要有输入输出、任务本身的节点信息 任务调度主要有任务的依赖类型、依赖关系等,以及不同类型调度任务的运行日志等。

 

  • 数据质量和运维相关元数据:如任务监控、运维报警、数据质量、故障等信息,包括任务监控运行日志、告警配置及运行日志、故障信息等。

 

业务元数据为管理层和业务分析人员服务,从业务角度描述数据,包括商务术语、数据仓库中有什么数据、数据的位置和数据的可用性等,帮助业务人员更好地理解数据仓库中哪些数据是可用的以及如何使用。

 

  • 常见的业务元数据有维度及属性(包括维度编码,字段类型,创建人,创建时间,状态等)、业务过程、指标(包含指标名称,指标编码,业务口径,指标类型,责任人,创建时间,状态,sql等),安全等级,计算逻辑等的规范化定义,用于更好地管理和使用数据。数据应用元数据,如数据报表、数据产品等的配置和运行元数据。

 

元数据不仅定义了数据仓库中数据的模式、来源、抽取和转换规则等,而且是整个数据仓库系统运行的基础,元数据把数据仓库系统中各个松散的组件联系起来,组成了一个有机的整体。

 

元数据治理主要解决三个问题:

 

  • 通过建立相应的组织、流程和工具,推动业务标准的落地实施,实现指标的规范定义,消除指标认知的歧义;

 

  • 基于业务现状和未来的演进方式,对业务模型进行抽象,制定清晰的主题、业务过程和分析方向,构建完备的技术元数据,对物理模型进行准确完善的描述,并打通技术元数据与业务元数据的关系,对物理模型进行完备的刻画;

 

  • 通过元数据建设,为使用数据提效,解决“找数、理解数、评估”难题以及“取数、数据可视化”等难题。

 

4)安全治理

 

围绕数据安全标准,首先要有数据的分级、分类标准,确保数据在上线前有着准确的密级。第二,针对数据使用方,要有明确的角色授权标准,通过分级分类和角色授权,来保障重要数据拿不走。第三,针对敏感数据,要有隐私管理标准,保障敏感数据的安全存储,即使未授权用户绕过权限管理拿到敏感数据,也要确保其看不懂。第四,通过制定审计标准,为后续的审计提供审计依据,确保数据走不脱。

 

5)据生命周期治理

 

任何事物都具有一定的生命周期,数据也不例外。从数据的产生、加工、使用乃至消亡都应该有一个科学的管理办法,将极少或者不再使用的数据从系统中剥离出来,并通过核实的存储设备进行保留,不仅能够提高系统的运行效率,更好的服务客户,还能大幅度减少因为数据长期保存带来的储存成本。数据生命周期一般包含在线阶段、归档阶段(有时还会进一步划分为在线归档阶段和离线归档阶段)、销毁阶段三大阶段,管理内容包括建立合理的数据类别,针对不同类别的数据制定各个阶段的保留时间、存储介质、清理规则和方式、注意事项等。

 

图片

 

从上图数据生命周期中各参数间的关系中我们可以了解到,数据生命周期管理可以使得高价值数据的查询效率大幅提升,而且高价格的存储介质的采购量也可以减少很多;但是随着数据的使用程度的下降,数据被逐渐归档,查询时间也慢慢的变长;最后随着数据的使用频率和价值基本没有了之后,就可以逐渐销毁了。

 

八、数据质量建设

 

数据治理的范围非常广,包含数据本⾝的管理、数据安全、数据质量、数据成本等。在这么多治理内容中,大家想下最重要的治理是什么?当然是数据质量治理,因为数据质量是数据分析结论有效性和准确性的基础,也是这一切的前提。所以如何保障数据质量,确保数据可用性是数据仓库建设中不容忽视的环节。

 

图片

 

数据质量涉及的范围也很广,贯穿数仓的整个生命周期,从数据产生->数据接入->数据存储->数据处理->数据输出->数据展示,每个阶段都需要质量治理。

 

在系统建设的各个阶段都应该根据标准进行数据质量检测和规范,及时进行治理,避免事后的清洗工作。

 

 

1、为什么要进行数据质量评估

 

很多刚入门的数据人,拿到数据后会立刻开始对数据进行各种探查、统计分析等,企图能立即发现数据背后隐藏的信息和知识。然而忙活了一阵才颓然发现,并不能提炼出太多有价值的信息,白白浪费了大量的时间和精力。比如和数据打交道的过程中,可能会出现以下的场景:

 

场景一:作为数据分析人员,要统计一下近 7 天用户的购买情况,结果从数仓中统计完发现,很多数据发生了重复记录,甚至有些数据统计单位不统一。

 

场景二:业务看报表,发现某一天的成交 gmv 暴跌,经过排查发现,是当天的数据缺失。

 

造成这一情况的一个重要因素就是忽视了对数据质量的客观评估,没有制定合理的衡量标准,导致没有发现数据已出现问题。所以,进行科学、客观的数据质量衡量标准是非常必要且十分重要的。

 

 

2、数据质量衡量标准

 

如何评估数据质量的好坏,业界有不同的标准,我总结了以下六个维度进行评估,包括完整性、规范性、一致性、准确性、唯一性、及时性。

 

图片

 

1)数据完整性

 

完整性指的是数据信息是否存在缺失的状况,数据缺失的情况可能是整个数据记录缺失,也可能是数据中某个字段信息的记录缺失。

 

2)数据规范性

 

规范性指的是描述数据遵循预定的语法规则的程度,是否符合其定义,比如数据的类型、格式、取值范围等。

 

3)数据一致性

 

一致性是指数据是否遵循了统一的规范,数据集合是否保持了统一的格式。数据质量的一致性主要体现在数据记录的规范和数据是否符合逻辑,一致性并不意味着数值上的绝对相同,而是数据收集、处理的方法和标准的一致。常见的一致性指标有:ID 重合度、属性一致、取值一致、采集方法一致、转化步骤一致。

 

4)数据准确性

 

准确性是指数据记录的信息是否存在异常或错误。和一致性不一样,存在准确性问题的数据不仅仅只是规则上的不一致,更为常见的数据准确性错误就如乱码,其次异常的大或者小的数据也是不符合条件的数据。常见的准确性指标有:缺失值占比、错误值占比、异常值占比、抽样偏差、数据噪声。

 

5)数据唯一性

 

唯一性指的是数据库的数据不存在重复的情形。比如真实成交 1 万条,但数据表有 3000 条重复了,成了 1.3 万条成交记录,这种数据不符合数据唯一性。

 

6)数据及时性

 

及时性是指数据从产生到可以查看的时间间隔,也叫数据的延时时长。比如一份数据是统计离线今日的,结果都是第二天甚至第三天才能统计完,这种数据不符合数据及时性。

 

还有一些其他的衡量标准,在此简单列出:

 

图片

 

 

3、数据质量管理流程

 

本节流程如下图所示:

 

图片

 

1)数据资产等级

 

① 等级定义

 

根据当数据质量不满足完整性、规范性、一致性、准确性、唯一性、及时性时,对业务的影响程度大小来划分数据的资产等级。

 

  • 毁灭性:数据一旦出错,会引起巨大的资产损失,面临重大收益受损等。标记为 L1

 

  • 全局性:数据用于集团业务、企业级效果评估和重要决策任务等。标记为 L2

 

  • 局部性:数据用于某个业务线的日常运营、分析报告等,如果出现问题会给该业务线造成一定的影响或影响其工作效率。标记为 L3

 

  • 一般性:数据用于日常数据分析,出现问题的带来的影响很小。标记为 L4

 

  • 未知性质:无法追溯数据的应用场景。标记为 Lx

 

重要程度:L1>L2>L3>L4>Lx。如果一份数据出现在多个应用场景中,则根据其最重要程度进行标记。

 

② 等级划分

 

定义数据资产等级后,我们可以从数据流程链路开始进行数据资产等级标记,完成数据资产等级确认,给不同的数据定义不同的重要程度。

 

  • 分析数据链路

 

数据是从业务系统中产生的,经过同步工具进入数据仓库系统中,在数据仓库中进行一般意义上的清洗、加工、整合、算法、模型等一系列运算后,再通过同步工具输出到数据产品中进行消费。而从业务系统到数据仓库再到数据产品都是以表的形式体现的,其流转过程如下图所示:

 

图片

 

  • 标记数据资产等级:

 

在所有数据链路上,整理出消费各个表的应用业务。通过给这些应用业务划分数据资产等级,结合数据的上下游依赖关系,将整个链路打上某一类资产等级标签。

 

举例:

 

假设公司有统一的订单服务中心。应用层的应用业务是按照业务线,商品类型和地域统计公司的订单数量和订单金额,命名为order_num_amount。

 

假设该应用会影响到整个企业的重要业务决策,我们可以把应用定级为 L2,从而整个数据链路上的表的数据等级,都可以标记为L2-order_num_amount,一直标记到源数据业务系统,如下图所示:

 

图片

 

2)数据加工过程卡点校验

 

① 在线系统数据校验

 

在线业务复杂多变,总是在不断地变更,每一次变更都会带来数据的变化,数据仓库需要适应这多变的业务发展,及时做到数据的准确性。

 

基于此,在线业务的变更如何高效地通知到离线数据仓库,同样也是需要考虑的问题。为了保障在线数据和离线数据的一致性,我们可以通过工具+人员管理并行的方式来尽可能的解决以上问题:既要在工具上自动捕捉每一次业务的变化,同时也要求开发人员在意识上自动进行业务变更通知。

 

  • 业务上线发布平台:

 

监控业务上线发布平台上的重大业务变更,通过订阅这个发布过程,及时将变更内容通知到数据部门。

 

由于业务系统复杂多变,若日常发布变更频繁,那么每次都通知数据部门,会造成不必要的资源浪费。这时,我们可以使用之前已经完成标记的数据资产等级标签,针对涉及高等级数据应用的数据资产,整理出哪些类型的业务变更会影响数据的加工或者影响数据统计口径的调整,则这些情况都必须及时通知到数据部门。

 

如果公司没有自己的业务发布平台,那么就需要与业务部门约定好,针对高等级的数据资产的业务变更,需要以邮件或者其他书面的说明及时反馈到数据部门。

 

  • 操作人员管理:

 

工具只是辅助监管的一种手段,而使用工具的人员才是核心。数据资产等级的上下游打通过程需要通知给在线业务系统开发人员,使其知道哪些是重要的核心数据资产,哪些暂时还只是作为内部分析数据使用,提高在线开发人员的数据风险意识。

 

可以通过培训的方式,把数据质量管理的诉求,数据质量管理的整个数据加工过程,以及数据产品的应用方式及应用场景告知在线开发人员,使其了解数据的重要性、价值及风险。确保在线开发人员在完成业务目标的同时,也要考虑数据的目标,保持业务端和数据段一致。

 

② 离线系统数据校验

 

数据从在线业务系统到数据仓库再到数据产品的过程中,需要在数据仓库这一层完成数据的清洗、加工。正是有了数据的加工,才有了数据仓库模型和数据仓库代码的建设。如何保障数据加过程中的质量,是离线数据仓库保障数据质量的一个重要环节。

 

在这些环节中,我们可以采用以下方式来保障数据质量:

 

  • 代码提交核查:

     

开发相关的规则引擎,辅助代码提交校验。规则分类大致为:

 

  • 代码规范类规则:如表命名规范、字段命名规范、生命周期设置、表注释等;

 

  • 代码质量类规则:如分母为 0 提醒、NUll 值参与计算提醒等;

 

  • 代码性能类规则:如大表提醒、重复计算监测、大小表 join 操作提醒等。

 

  • 代码发布核查:

 

加强测试环节,测试环境测试后再发布到生成环境,且生成环境测试通过后才算发布成功。

 

  • 任务变更或重跑数据:

 

在进行数据更新操作前,需要通知下游数据变更原因、变更逻辑、变更时间等信息。下游没有异议后,再按照约定时间执行变更发布操作。

 

3)数据处理风险监控

 

风险点监控主要是针对数据在日常运行过程中容易出现的风险进行监控并设置报警机制,主要包括在线数据离线数据运行风险点监控。

 

① 数据质量监控

 

在线业务系统的数据生产过程需要保证数据质量,主要根据业务规则对数据进行监控。

 

比如交易系统配置的一些监控规则,如订单拍下时间、订单完结时间、订单支付金额、订单状态流转等都配置了校验规则。订单拍下时间肯定不会大于当天时间,也不会小于业务上线时间,一旦出现异常的订单创建时间,就会立刻报警,同时报警给到多人。通过这种机制,可以及时发现并解决问题。

 

随着业务负责程度的提升,会导致规则繁多、规则配置的运行成本增大,这时可以按照我们之前的数据资产等级有针对性的进行监控。

 

离线数据风险点监控主要包括对数据准确性和数据产出及时性的监控。对数据调度平台上所有数据处理调度进行监控。

 

我们以阿里的 DataWorks 数据调度工具为例,DataWorks 是基于 MaxCompute 计算引擎的一站式开发工场,帮助企业快速完成数据集成、开发、治理、质量、安全等全套数据研发工作。

 

DataWorks 中的 DQC 通过配置数据质量校验规则,实现离线数据处理中的数据质量监控报警机制。

 

下图是 DQC 的工作流程图:

 

图片

 

DQC 数据监控规则有强规则和弱规则:

 

  • 强规则:一旦触发报警就会阻断任务的执行(将任务置为失败状态,使下游任务不会被触发执行)。

 

  • 弱规则:只报警但不阻断任务的执行。

 

DQC 提供常用的规则模板,包括表行数较 N 天前波动率、表空间大小较 N 天前波动率、字段最大/最小/平均值相比 N 天前波动率、字段空值/唯一个数等。

 

DQC 检查其实也是运行 SQL 任务,只是这个任务是嵌套在主任务中的,一旦检查点太多自然就会影响整体的性能,因此还是依赖数据产等级来确定规则的配置情况。比如 L1、L2 类数据监控率要达到 90% 以上,规则类型需要三种及以上,而不重要的数据资产则不强制要求。

 

② 数据及时性监控

 

在确保数据准确性的前提下,需要进一步让数据能够及时地提供服务,否则数据的价值将大幅度降低,甚至没有价值,所以确保数据及时性也是保障数据质量重中之重的一环。

 

  • 任务优先级:

 

对于DataWorks平台的调度任务,可以通过智能监控工具进行优先级设置。DataWorks的调度是一个树形结构,当配置了叶子节点的优先级,这个优先级会传递到所有的上游节点,而叶子节点通常就是服务业务的消费节点。

 

因此,在优先级的设置上,要先确定业务的资产等级,等级越高的业务对应的消费节点优先级越高,优先调度并占用计算资源,确保高等级业务的准时产出。

 

总之,就是按照数据资产等级优先执行高等级数据资产的调度任务,优先保障高等级业务的数据需求。

 

  • 任务报警:

 

任务报警和优先级类似,通过DataWorks的智能监控工具进行配置,只需要配置叶子节点即可向上游传递报警配置。任务执行过程中,可能出错或延迟,为了保障最重要数据(即资产等级高的数据)产出,需要立即处理出错并介入处理延迟。

 

  • DataWorks智能监控:

 

DataWorks进行离线任务调度时,提供智能监控工具,对调度任务进行监控告警。根据监控规则和任务运行情况,智能监控决策是否报警、何时报警、如何报警以及给谁报警。智能监控会自动选择最合理的报警时间、报警方式以及报警对象。

 

最后,要想真正解决数据质量问题,就要明确业务需求并从需求开始控制数据质量,并建立数据质量管理机制。从业务出发做问题定义,由工具自动、及时发现问题,明确问题责任人,通过邮件、短信等方式进行通知,保证问题及时通知到责任人。跟踪问题整改进度,保证数据质量问题全过程的管理。

 

九、数仓规范建设指南

 

 

1、数仓公共开发规范

 

1)层次调用规范

 

稳定业务按照标准的数据流向进行开发,即 ODS –> DWD –> DWS –> APP。非稳定业务或探索性需求,可以遵循 ODS -> DWD -> APP 或者 ODS -> DWD -> DWM ->APP 两个模型数据流。

 

在保障了数据链路的合理性之后,也必须保证模型分层引用原则:

 

  • 正常流向:ODS -> DWD -> DWM -> DWS -> APP,当出现 ODS -> DWD -> DWS -> APP 这种关系时,说明主题域未覆盖全。应将 DWD 数据落到 DWM 中,对于使用频度非常低的表允许 DWD -> DWS。

 

  • 尽量避免出现 DWS 宽表中使用 DWD 又使用(该 DWD 所归属主题域)DWM 的表。

 

  • 同一主题域内对于 DWM 生成 DWM 的表,原则上要尽量避免,否则会影响 ETL 的效率。

 

  • DWM、DWS 和 APP 中禁止直接使用 ODS 的表, ODS 的表只能被 DWD 引用。

 

  • 禁止出现反向依赖,例如 DWM 的表依赖 DWS 的表。

 

举例:

 

图片

 

2)数据类型规范

 

需统一规定不同的数据的数据类型,严格按照规定的数据类型执行:

 

  • 金额:double 或使用 decimal(11,2) 控制精度等,明确单位是分还是元。

 

  • 字符串:string。

 

  • id类:bigint。

 

  • 时间:string。

 

  • 状态:string

 

3)数据冗余规范

 

宽表的冗余字段要确保:

 

  • 冗余字段要使用高频,下游3个或以上使用。

 

  • 冗余字段引入不应造成本身数据产生过多的延后。

 

  • 冗余字段和已有字段的重复率不应过大,原则上不应超过60%,如需要可以选择join或原表拓展。

 

4)NULL字段处理规范

 

  • 对于维度字段,需设置为-1

 

  • 对于指标字段,需设置为 0

 

5)指标口径规范

 

保证主题域内,指标口径一致,无歧义。

 

通过数据分层,提供统一的数据出口,统一对外输出的数据口径,避免同一指标不同口径的情况发生。

 

① 指标梳理

 

指标口径的不一致使得数据使用的成本极高,经常出现口径打架、反复核对数据的问题。在数据治理中,我们将需求梳理到的所有指标进行进一步梳理,明确其口径,如果存在两个指标名称相同,但口径不一致,先判断是否是进行合并,如需要同时存在,那么在命名上必须能够区分开。

 

② 指标管理

 

指标管理分为原子指标维护和派生指标维护。

 

原子指标:

 

  • 选择原子指标的归属产线、业务板块、数据域、业务过程

  • 选择原子指标的统计数据来源于该业务过程下的原始数据源

  • 录入原子指标的英文名称、中文名称、概述

  • 填写指标函数

  • 系统根据指标函数自动生成原子指标的定义表达式

  • 系统根据指标定义表达式以及数据源表生成原子指标SQL

 

派生指标:

 

  • 在原子指标的基础之上选择了一些维度或者修饰限定词。

 

6)数据表处理规范

 

① 增量表

 

新增数据,增量数据是上次导出之后的新数据。

 

  • 记录每次增加的量,而不是总量;

 

  • 增量表,只报变化量,无变化不用报;

 

  • 每天一个分区。

 

② 全量表

 

每天的所有的最新状态的数据。

 

  • 全量表,有无变化,都要报;

 

  • 每次上报的数据都是所有的数据(变化的 + 没有变化的);

 

  • 只有一个分区。

 

③ 快照表

 

按日分区,记录截止数据日期的全量数据。

 

  • 快照表,有无变化,都要报;

 

  • 每次上报的数据都是所有的数据(变化的 + 没有变化的);

 

  • 一天一个分区。

 

④ 拉链表

 

记录截止数据日期的全量数据。

 

  • 记录一个事物从开始,一直到当前状态的所有变化的信息;

 

  • 拉链表每次上报的都是历史记录的最终状态,是记录在当前时刻的历史总 量;

 

  • 当前记录存的是当前时间之前的所有历史记录的最后变化量(总量);

 

  • 只有一个分区。

 

7)表的生命周期管理

 

这部分主要是要通过对历史数据的等级划分与对表类型的划分生成相应的生命周期管理矩阵。

 

① 历史数据等级划分

 

主要将历史数据划分P0、Pl、P2、P3 四个等级,其具体定义如下:

 

  • P0 :非常重要的主题域数据和非常重要的应用数据,具有不可恢复性,如交易、日志、集团 KPI 数据、 IPO 关联表。

 

  • Pl :重要的业务数据和重要的应用数据,具有不可恢复性,如重要的业务产品数据。

 

  • P2 :重要的业务数据和重要的应用数据,具有可恢复性,如交易线 ETL 产生的中间过程数据。

 

  • P3 :不重要的业务数据和不重要的应用数据,具有可恢复性,如某些 SNS 产品报表。

 

② 表类型划分

 

  • 事件型流水表(增量表)

 

事件型流水表(增量表)指数据无重复或者无主键数据,如日志。

 

  • 事件型镜像表(增量表)

 

事件型镜像表(增量表)指业务过程性数据,有主键,但是对于同样主键的属性会发生缓慢变化,如交易、订单状态与时间会根据业务发生变更。

 

  • 维表

 

维表包括维度与维度属性数据,如用户表、商品表。

 

  • Merge 全量表

 

Merge 全量表包括业务过程性数据或者维表数据。由于数据本身有新增的或者发生状态变更,对于同样主键的数据可能会保留多份,因此可以对这些数据根据主键进行 Merge 操作,主键对应的属性只会保留最新状态,历史状态保留在前一天分区 中。例如,用户表、交易表等都可以进行 Merge 操作。

 

  • ETL 临时表

 

ETL 临时表是指 ETL 处理过程中产生的临时表数据,一般不建议保留,最多7天。

 

  • TT 临时数据

 

TT 拉取的数据和 DbSync 产生的临时数据最终会流转到 DS 层,ODS 层数据作为原始数据保留下来,从而使得 TT&DbSync 上游数据成为临时数据。这类数据不建议保留很长时间,生命周期默认设置为 93天,可以根据实际情况适当减少保留天数。

 

  • 普通全量表

 

很多小业务数据或者产品数据,BI一般是直接全量拉取,这种方式效率快,对存储压力也不是很大,而且表保留很长时间,可以根据历史数据等级确定保留策略。

 

通过上述历史数据等级划分与表类型划分,生成相应的生命周期管理矩阵,如下表所示:

 

图片

 

 

2、数仓各层开发规范

 

1)ODS层设计规范

 

① 同步规范:

 

  • 一个系统源表只允许同步一次;

 

  • 全量初始化同步和增量同步处理逻辑要清晰;

 

  • 以统计日期和时间进行分区存储;

     

  • 目标表字段在源表不存在时要自动填充处理。

 

② 表分类与生命周期:

 

  • ods流水全量表:

 

  • 不可再生的永久保存;

     

  • 日志可按留存要求;

     

  • 按需设置保留特殊日期数据;

     

  • 按需设置保留特殊月份数据;

 

  • ods镜像型全量表:

     

  • 推荐按天存储;

     

  • 对历史变化进行保留;

     

  • 最新数据存储在最大分区;

     

  • 历史数据按需保留;

 

  • ods增量数据:

 

  • 推荐按天存储;

     

  • 有对应全量表的,建议只保留14天数据;

     

  • 无对应全量表的,永久保留;

 

  • ods的etl过程中的临时表:

 

  • 推荐按需保留;

     

  • 最多保留7天;

     

  • 建议用完即删,下次使用再生成;

 

  • BDSync非去重数据:

 

  • 通过中间层保留,默认用完即删,不建议保留。

 

数据质量

 

  • 全量表必须配置唯一性字段标识;

     

  • 对分区空数据进行监控;

     

  • 对枚举类型字段,进行枚举值变化和分布监控;

     

  • ods表数据量级和记录数做环比监控;

     

  • ods全表都必须要有注释;

 

2)公共维度层设计规范

 

① 设计准则

 

  • 一致性

     

共维度在不同的物理表中的字段名称、数据类型、数据内容必须保持一致(历史原因不一致,要做好版本控制)

 

  • 维度的组合与拆分

 

  • 组合原则:

 

将维度与关联性强的字段进行组合,一起查询,一起展示,两个维度必须具有天然的关系,如:商品的基本属性和所属品牌。

 

无相关性:如一些使用频率较小的杂项维度,可以构建一个集合杂项维度的特殊属性。

 

行为维度:经过计算的度量,但下游当维度处理,例:点击量 0-1000,100-1000等,可以做聚合分类。

 

  • 拆分与冗余:

 

针对重要性,业务相关性、源、使用频率等可分为核心表、扩展表。

 

数据记录较大的维度,可以适当冗余一些子集。

 

② 存储及生命周期管理

 

建议按天分区。

 

  • 3个月内最大访问跨度<=4天时,建议保留最近7天分区;

  • 3个月内最大访问跨度<=12天时,建议保留最近15天分区;

  • 3个月内最大访问跨度<=30天时,建议保留最近33天分区;

  • 3个月内最大访问跨度<=90天时,建议保留最近120天分区;

  • 3个月内最大访问跨度<=180天时,建议保留最近240天分区;

  • 3个月内最大访问跨度<=300天时,建议保留最近400天分区;

 

3)DWD明细层设计规范

 

① 存储及生命周期管理

 

建议按天分区。

 

  • 3个月内最大访问跨度<=4天时,建议保留最近7天分区;

  • 3个月内最大访问跨度<=12天时,建议保留最近15天分区;

  • 3个月内最大访问跨度<=30天时,建议保留最近33天分区;

  • 3个月内最大访问跨度<=90天时,建议保留最近120天分区;

  • 3个月内最大访问跨度<=180天时,建议保留最近240天分区;

  • 3个月内最大访问跨度<=300天时,建议保留最近400天分区;

 

② 事务型事实表设计准则

 

  • 基于数据应用需求的分析设计事务型事实表,结合下游较大的针对某个业务过程和分析指标需求,可考虑基于某个事件过程构建事务型实时表;

     

  • 一般选用事件的发生日期或时间作为分区字段,便于扫描和裁剪;

     

  • 冗余子集原则,有利于降低后续IO开销;

     

  • 明细层事实表维度退化,减少后续使用join成本。

 

③ 周期快照事实表

 

  • 周期快照事实表中的每行汇总了发生在某一标准周期,如某一天、某周、某月的多个度量事件。

     

  • 粒度是周期性的,不是个体的事务。

     

  • 通常包含许多事实,因为任何与事实表粒度一致的度量事件都是被允许的。

 

④ 累积快照事实表

 

  • 多个业务过程联合分析而构建的事实表,如采购单的流转环节。

     

  • 用于分析事件时间和时间之间的间隔周期。

     

  • 少量的且当前事务型不支持的,如关闭、发货等相关的统计。

 

4)DWS公共汇总层设计规范

 

数据仓库的性能是数据仓库建设是否成功的重要标准之一。聚集主要是通过汇总明细粒度数据来获得改进查询性能的效果。通过访问聚集数据,可以减少数据库在响应查询时必须执行的工作量,能够快速响应用户的查询,同时有利于减少不同用访问明细数据带来的结果不一致问题。

 

① 聚集的基本原则

 

  • 一致性。聚集表必须提供与查询明细粒度数据一致的查询结果。

     

  • 避免单一表设计。不要在同一个表中存储不同层次的聚集数据。

     

  • 聚集粒度可不同。聚集并不需要保持与原始明细粒度数据一样的粒度,聚集只关心所需要查询的维度。

 

② 聚集的基本步骤

 

  • 第一步:确定聚集维度

 

在原始明细模型中会存在多个描述事实的维度,如日期、商品类别、卖家等,这时候需要确定根据什么维度聚集,如果只关心商品的交易额情况,那么就可以根据商品维度聚集数据。

 

  • 第二步:确定一致性上钻

 

这时候要关心是按月汇总还是按天汇总,是按照商品汇总还是按照类目汇总,如果按照类目汇总,还需要关心是按照大类汇总还是小类汇总。当然,我们要做的只是了解用户需要什么,然后按照他们想要的进行聚集。

 

  • 第三步:确定聚集事实

 

在原始明细模型中可能会有多个事实的度量,比如在交易中有交易额、交易数量等,这时候要明确是按照交易额汇总还是按照成交数量汇总。

 

③ 公共汇总层设计原则

 

除了聚集基本的原则外,公共汇总层还必须遵循以下原则:

 

  • 数据公用性。汇总的聚集会有第三者使用吗?基于某个维度的聚集是不是经常用于数据分析中?如果答案是肯定的,那么就有必要把明细数据经过汇总沉淀到聚集表中。

 

  • 不跨数据域。数据域是在较高层次上对数据进行分类聚集的抽象。如以业务

 

  • 区分统计周期。在表的命名上要能说明数据的统计周期,如 _Id 表示最近1天,_td 表示截至当天,_nd 表示最近N天。

 

 

3、数仓命名规范

 

1)词根设计规范

 

词根属于数仓建设中的规范,属于元数据管理的范畴,现在把这个划到数据治理的一部分。完整的数仓建设是包含数据治理的,只是现在谈到数仓偏向于数据建模, 而谈到数据治理,更多的是关于数据规范、数据管理。

 

表命名,其实在很大程度上是对元数据描述的一种体现,表命名规范越完善,我 们能从表名获取到的信息就越多。比如:一部分业务是关于货架的,英文名是:rack, rack 就是一个词根,那我们就在所有的表、字段等用到的地方都叫 rack,不要叫成 别的什么。这就是词根的作用,用来统一命名,表达同一个含义。

 

指标体系中有很多“率”的指标,都可以拆解成 XXX+率,率可以叫 rate,那我 们所有的指标都叫做 XXX+rate。

 

词根:可以用来统一表名、字段名、主题域名等等。

 

举例:以流程图的方式来展示,更加直观和易懂,本图侧重 dwm 层表的命名 规范,其余命名是类似的道理:

 

图片

 

第一个判断条件是该表的用途,是中间表、原始日志还是业务展示用的表 如果该表被判断为中间表,就会走入下一个判断条件:表是否有 group 操作 通过是否有 group 操作来判断该表该划分在 dwd 层还是 dwm 和 dws 层 如果不是 dwd 层,则需要判断该表是否是多个行为的汇总表(即宽表) 最后再分别填上事业群、部门、业务线、自定义名称和更新频率等信息即可。

 

分层:表的使用范围

 

事业群和部门:生产该表或者该数据的团队

 

业务线:表明该数据是哪个产品或者业务线相关

 

主题域:分析问题的角度,对象实体

 

自定义:一般会尽可能多描述该表的信息,比如活跃表、留存表等

 

更新周期比如说天级还是月级更新

 

数仓表的命名规范如下:

 

  • 数仓层次:

 

  • 公用维度:dim

  • DM层:dm

  • ODS层:ods

  • DWD层:dwd

  • DWS层:dws

 

  • 周期/数据范围:

 

  • 日快照:d

  • 增量:i

  • 全量:f

  • 周:w

  • 拉链表:l

  • 非分区全量表:a

 

2)表命名规范

 

① 常规表

 

常规表是我们需要固化的表,是正式使用的表,是目前一段时间内需要去维护去 完善的表。

 

规范:分层前缀[dwd|dws|ads]_部门_业务域_主题域_XXX_更新周期|数据范围

 

业务域、主题域我们都可以用词根的方式枚举清楚,不断完善。

 

更新周期主要的是时间粒度、日、月、年、周等。

 

② 中间表

 

中间表一般出现在 Job 中,是 Job 中临时存储的中间数据的表,中间表的作 用域只限于当前 Job 执行过程中,Job 一旦执行完成,该中间表的使命就完 成了,是可以删除的(按照自己公司的场景自由选择,以前公司会保留几天 的中间表数据,用来排查问题)。

 

规范:mid_table_name_[0~9|dim]

 

table_name 是我们任务中目标表的名字,通常来说一个任务只有一个目标表。这里加上表名,是为了防止自由发挥的时候表名冲突,而末尾大家可以选择自由发挥,起一些有意义的名字,或者简单粗暴,使用数字代替,各有优劣吧,谨慎选择。

 

通常会遇到需要补全维度的表,这里使用 dim 结尾。

 

如果要保留历史的中间表,可以加上日期或者时间戳。

 

③ 临时表

 

临时表是临时测试的表,是临时使用一次的表,就是暂时保存下数据看看,后续一般不再使用的表,是可以随时删除的表。

 

规范:tmp_xxx

 

只要加上 tmp 开头即可,其他名字随意,注意 tmp 开头的表不要用来实际使用,只是测试验证而已。

 

④ 维度表

 

维度表是基于底层数据,抽象出来的描述类的表。维度表可以自动从底层表抽象出来,也可以手工来维护。

 

规范:dim_xxx

 

维度表,统一以 dim 开头,后面加上,对该指标的描述。

 

⑤ 手工表

 

手工表是手工维护的表,手工初始化一次之后,一般不会自动改变,后面变更,也是手工来维护。

 

一般来说,手工的数据粒度是偏细的,所以暂时统一放在 dwd 层,后面如果有目标值或者其他类型手工数据,再根据实际情况分层。

 

规范:dwd_业务域_manual_xxx

 

手工表,增加特殊的主题域,manual,表示手工维护表。

 

3)指标命名规范

 

① 公共规则

 

  • 所有单词小写

  • 单词之间下划线分割(反例:appName 或 AppName)

  • 可读性优于长度 (词根,避免出现同一个指标,命名一致性)

  • 禁止使用 sql 关键字,如字段名与关键字冲突时 +col

  • 数量字段后缀 _cnt 等标识...

  • 金额字段后缀 _price 标识

  • 天分区使用字段 dt,格式统一(yyyymmdd 或 yyyy-mm-dd)

  • 小时分区使用字段 hh,范围(00-23)

  • 分钟分区使用字段 mi,范围(00-59)

  • 布尔类型标识:is_{业务},不允许出现空值

 

作者丨园陌

来源丨公众号:五分钟学大数据(ID:LearnBigdata)

dbaplus社群欢迎广大技术人员投稿,投稿邮箱:editor@dbaplus.cn

最新评论
访客 2024年04月08日

如果字段的最大可能长度超过255字节,那么长度值可能…

访客 2024年03月04日

只能说作者太用心了,优秀

访客 2024年02月23日

感谢详解

访客 2024年02月20日

一般干个7-8年(即30岁左右),能做到年入40w-50w;有…

访客 2023年08月20日

230721

活动预告