为了让你搭好数仓(离线+实时),不小心又肝3w字……

园陌 2022-03-16 10:23:35

文档大纲

 

图片

 

本文上半部分之前已经发过了,传送门:数仓建设保姆级5W字教程,离线实时一网打尽(理论+实战)

 

此篇文章是整个文档的下半部分,将接着上半部分从第五章开始。

 

五、实时数仓建设核心

 

 

1、实时计算初期

 

虽然实时计算在最近几年才火起来,但是在早期也有部分公司有实时计算的需求,但是数据量比较少,所以在实时方面形成不了完整的体系,基本所有的开发都是具体问题具体分析,来一个需求做一个,基本不考虑它们之间的关系,开发形式如下:

 

图片

早期实时计算

 

如上图所示,拿到数据源后,会经过数据清洗,扩维,通过Flink进行业务逻辑处理,最后直接进行业务输出。把这个环节拆开来看,数据源端会重复引用相同的数据源,后面进行清洗、过滤、扩维等操作,都要重复做一遍,唯一不同的是业务的代码逻辑是不一样的。

 

随着产品和业务人员对实时数据需求的不断增多,这种开发模式出现的问题越来越多:

 

  • 数据指标越来越多,“烟囱式”的开发导致代码耦合问题严重。

 

  • 需求越来越多,有的需要明细数据,有的需要 OLAP 分析。单一的开发模式难以应付多种需求。

 

  • 每个需求都要申请资源,导致资源成本急速膨胀,资源不能集约有效利用。

 

  • 缺少完善的监控系统,无法在对业务产生影响之前发现并修复问题。

 

大家看实时数仓的发展和出现的问题,和离线数仓非常类似,后期数据量大了之后产生了各种问题,离线数仓当时是怎么解决的?离线数仓通过分层架构使数据解耦,多个业务可以共用数据,实时数仓是否也可以用分层架构呢?当然是可以的,但是细节上和离线的分层还是有一些不同,稍后会讲到。

 

 

2、实时数仓建设

 

从方法论来讲,实时和离线是非常相似的,离线数仓早期的时候也是具体问题具体分析,当数据规模涨到一定量的时候才会考虑如何治理。分层是一种非常有效的数据治理方式,所以在实时数仓如何进行管理的问题上,首先考虑的也是分层的处理逻辑。

 

实时数仓的架构如下图:

 

图片

 

从上图中我们具体分析下每层的作用:

 

  • 数据源:在数据源的层面,离线和实时在数据源是一致的,主要分为日志类和业务类,日志类又包括用户日志,埋点日志以及服务器日志等。

 

  • 实时明细层:在明细层,为了解决重复建设的问题,要进行统一构建,利用离线数仓的模式,建设统一的基础明细数据层,按照主题进行管理,明细层的目的是给下游提供直接可用的数据,因此要对基础层进行统一的加工,比如清洗、过滤、扩维等。

 

  • 汇总层:汇总层通过Flink的简洁算子直接可以算出结果,并且形成汇总指标池,所有的指标都统一在汇总层加工,所有人按照统一的规范管理建设,形成可复用的汇总结果。

 

我们可以看出,实时数仓和离线数仓的分层非常类似,比如 数据源层,明细层,汇总层,乃至应用层,他们命名的模式可能都是一样的。但仔细比较不难发现,两者有很多区别:

 

  • 与离线数仓相比,实时数仓的层次更少一些:

 

  • 从目前建设离线数仓的经验来看,数仓的数据明细层内容会非常丰富,处理明细数据外一般还会包含轻度汇总层的概念,另外离线数仓中应用层数据在数仓内部,但实时数仓中,app 应用层数据已经落入应用系统的存储介质中,可以把该层与数仓的表分离。

 

  • 应用层少建设的好处:实时处理数据的时候,每建一个层次,数据必然会产生一定的延迟。

 

  • 汇总层少建的好处:在汇总统计的时候,往往为了容忍一部分数据的延迟,可能会人为的制造一些延迟来保证数据的准确。举例,在统计跨天相关的订单事件中的数据时,可能会等到 00:00:05 或者 00:00:10 再统计,确保 00:00 前的数据已经全部接受到位了,再进行统计。所以,汇总层的层次太多的话,就会更大的加重人为造成的数据延迟。

 

  • 与离线数仓相比,实时数仓的数据源存储不同:

 

  • 在建设离线数仓的时候,基本整个离线数仓都是建立在 Hive 表之上。但是,在建设实时数仓的时候,同一份表,会使用不同的方式进行存储。比如常见的情况下,明细数据或者汇总数据都会存在 Kafka 里面,但是像城市、渠道等维度信息需要借助 Hbase,MySQL 或者其他 KV 存储等数据库来进行存储。

 

 

3、Lambda架构的实时数仓

 

下图是基于 Flink 和 Kafka 的 Lambda 架构的具体实践,上层是实时计算,下层是离线计算,横向是按计算引擎来分,纵向是按实时数仓来区分:

 

图片

 

Lambda架构是比较经典的架构,以前实时的场景不是很多,以离线为主,当附加了实时场景后,由于离线和实时的时效性不同,导致技术生态是不一样的。Lambda架构相当于附加了一条实时生产链路,在应用层面进行一个整合,双路生产,各自独立。这在业务应用中也是顺理成章采用的一种方式。

 

双路生产会存在一些问题,比如加工逻辑double,开发运维也会double,资源同样会变成两个资源链路。因为存在以上问题,所以又演进了一个Kappa架构。

 

 

4、Kappa架构的实时数仓

 

Kappa架构相当于去掉了离线计算部分的Lambda架构,具体如下图所示:

 

图片

 

Kappa架构从架构设计来讲比较简单,生产统一,一套逻辑同时生产离线和实时。但是在实际应用场景有比较大的局限性,因为实时数据的同一份表,会使用不同的方式进行存储,这就导致关联时需要跨数据源,操作数据有很大局限性,所以在业内直接用Kappa架构生产落地的案例不多见,且场景比较单一。

 

关于 Kappa 架构,熟悉实时数仓生产的同学,可能会有一个疑问。因为我们经常会面临业务变更,所以很多业务逻辑是需要去迭代的。之前产出的一些数据,如果口径变更了,就需要重算,甚至重刷历史数据。对于实时数仓来说,怎么去解决数据重算问题?

 

Kappa 架构在这一块的思路是:首先要准备好一个能够存储历史数据的消息队列,比如 Kafka,并且这个消息队列是可以支持你从某个历史的节点重新开始消费的。接着需要新起一个任务,从原来比较早的一个时间节点去消费 Kafka 上的数据,然后当这个新的任务运行的进度已经能够和现在的正在跑的任务齐平的时候,你就可以把现在任务的下游切换到新的任务上面,旧的任务就可以停掉,并且原来产出的结果表也可以被删掉。

 

 

5、流批结合的实时数仓

 

随着实时 OLAP 技术的发展,目前开源的OLAP引擎在性能,易用等方面有了很大的提升,如Doris、Presto等,加上数据湖技术的迅速发展,使得流批结合的方式变得简单。

 

如下图是流批结合的实时数仓:

 

图片

 

数据从日志统一采集到消息队列,再到实时数仓,作为基础数据流的建设是统一的。之后对于日志类实时特征,实时大屏类应用走实时流计算。对于Binlog类业务分析走实时OLAP批处理。

 

我们看到流批结合的方式与上面几种架构的存储方式发生了变化,由Kafka换成了Iceberg,Iceberg是介于上层计算引擎和底层存储格式之间的一个中间层,我们可以把它定义成一种“数据组织格式”,底层存储还是HDFS,那么为什么加了中间层,就对流批结合处理的比较好了呢?Iceberg的ACID能力可以简化整个流水线的设计,降低整个流水线的延迟,并且所具有的修改、删除能力能够有效地降低开销,提升效率。Iceberg可以有效支持批处理的高吞吐数据扫描和流计算按分区粒度并发实时处理。

 

六、基于Flink SQL从0到1构建一个实时数仓

 

本小节内容来自大数据技术与数仓

 

实时数仓主要解决传统数仓数据时效性低的问题,实时数仓通常会用在实时的OLAP分析,实时大屏展示,实时监控报警各个场景。虽然关于实时数仓架构及技术选型与传统的离线数仓会存在差异,但是关于数仓建设的基本方法论是一致的。接下来主要介绍Flink SQL从0到1搭建一个实时数仓的demo,涉及到数据采集、存储、计算、可视化整个流程。

 

 

1、案例简介

 

本文以电商业务为例,展示实时数仓的数据处理流程。另外,本文旨在说明实时数仓的构建流程,所以不会涉及复杂的数据计算。为了保证案例的可操作性和完整性,本文会给出详细的操作步骤。为了方便演示,本文的所有操作都是在Flink SQL Cli中完成。

 

 

2、架构设计

 

具体的架构设计如图所示:首先通过canal解析MySQL的binlog日志,将数据存储在Kafka中。然后使用Flink SQL对原始数据进行清洗关联,并将处理之后的明细宽表写入Kafka中。维表数据存储在MySQL中,通过Flink SQL对明细宽表与维表进行join,将聚合后的数据写入MySQL,最后通过FineBI进行可视化展示。

 

图片

 

 

3、业务数据准备

 

1)订单表(order_info)

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
CREATE TABLE `order_info` (  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',  `consignee` varchar(100) DEFAULT NULL COMMENT '收货人',  `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',  `total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',  `order_status` varchar(20) DEFAULT NULL COMMENT '订单状态',  `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',  `payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式',  `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',  `order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',  `out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方支付用)',  `trade_body` varchar(200) DEFAULT NULL COMMENT '订单描述(第三方支付用)',  `create_time` datetime DEFAULT NULL COMMENT '创建时间',  `operate_time` datetime DEFAULT NULL COMMENT '操作时间',  `expire_time` datetime DEFAULT NULL COMMENT '失效时间',  `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',  `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',  `img_url` varchar(200) DEFAULT NULL COMMENT '图片路径',  `province_id` int(20) DEFAULT NULL COMMENT '地区',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';

 

2)订单详情表(order_detail)

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
CREATE TABLE `order_detail` (  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',  `order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',  `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',  `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称(冗余)',  `img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)',  `order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',  `sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',  `create_time` datetime DEFAULT NULL COMMENT '创建时间',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单详情表';

 

3)商品表(sku_info)

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
CREATE TABLE `sku_info` (  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'skuid(itemID)',  `spu_id` bigint(20) DEFAULT NULL COMMENT 'spuid',  `price` decimal(10,0) DEFAULT NULL COMMENT '价格',  `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称',  `sku_desc` varchar(2000) DEFAULT NULL COMMENT '商品规格描述',  `weight` decimal(10,2) DEFAULT NULL COMMENT '重量',  `tm_id` bigint(20) DEFAULT NULL COMMENT '品牌(冗余)',  `category3_id` bigint(20) DEFAULT NULL COMMENT '三级分类id(冗余)',  `sku_default_img` varchar(200) DEFAULT NULL COMMENT '默认显示图片(冗余)',  `create_time` datetime DEFAULT NULL COMMENT '创建时间',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='商品表';

 

4)商品一级类目表(base_category1)

 

  •  
  •  
  •  
  •  
  •  
CREATE TABLE `base_category1` (  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',  `name` varchar(10) NOT NULL COMMENT '分类名称',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='一级分类表';

 

5)商品二级类目表(base_category2)

 

  •  
  •  
  •  
  •  
  •  
  •  
CREATE TABLE `base_category2` (  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',  `name` varchar(200) NOT NULL COMMENT '二级分类名称',  `category1_id` bigint(20) DEFAULT NULL COMMENT '一级分类编号',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='二级分类表';

 

6)商品三级类目表(base_category3)

 

  •  
  •  
  •  
  •  
  •  
  •  
CREATE TABLE `base_category3` (  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',  `name` varchar(200) NOT NULL COMMENT '三级分类名称',  `category2_id` bigint(20) DEFAULT NULL COMMENT '二级分类编号',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='三级分类表';

 

7)省份表(区域表(base_region)base_province)

 

  •  
  •  
  •  
  •  
  •  
  •  
CREATE TABLE `base_province` (  `id` int(20) DEFAULT NULL COMMENT 'id',  `name` varchar(20) DEFAULT NULL COMMENT '省名称',  `region_id` int(20) DEFAULT NULL COMMENT '大区id',  `area_code` varchar(20) DEFAULT NULL COMMENT '行政区位码') ENGINE=InnoDB DEFAULT CHARSET=utf8;

 

8)区域表(base_region)

 

  •  
  •  
  •  
  •  
  •  
CREATE TABLE `base_region` (  `id` int(20) NOT NULL COMMENT '大区id',  `region_name` varchar(20) DEFAULT NULL COMMENT '大区名称',   PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8

 

 

4、数据处理流程

 

1)ods层数据同步

 

关于ODS层的数据同步这里就不详细展开。主要使用canal解析MySQL的binlog日志,然后将其写入到Kafka对应的topic中。由于篇幅限制,不会对具体的细节进行说明。同步之后的结果如下图所示:

 

图片

 

2)DIM层数据准备

 

本案例中将维表存储在了MySQL中,实际生产中会用HBase存储维表数据。我们主要用到两张维表:区域维表和商品维表。处理过程如下:

 

  • 区域维表

 

首先将mydw.base_provincemydw.base_region这个主题对应的数据抽取到MySQL中,主要使用Flink SQL的Kafka数据源对应的canal-json格式,注意:在执行装载之前,需要先在MySQL中创建对应的表,本文使用的MySQL数据库的名字为dim,用于存放维表数据。如下:

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
-- ---------------------------   省份--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_province`;CREATE TABLE `ods_base_province` (  `id` INT,  `name` STRING,  `region_id` INT ,  `area_code`STRING) WITH('connector' = 'kafka', 'topic' = 'mydw.base_province', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ;  -- ---------------------------   省份--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_province`;CREATE TABLE `base_province` (    `id` INT,    `name` STRING,    `region_id` INT ,    `area_code`STRING,    PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_province', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s'); -- ---------------------------   省份--   MySQL Sink Load Data-- ------------------------- INSERT INTO base_provinceSELECT *FROM ods_base_province; -- ---------------------------   区域--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_region`;CREATE TABLE `ods_base_region` (  `id` INT,  `region_name` STRING) WITH('connector' = 'kafka', 'topic' = 'mydw.base_region', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ;  -- ---------------------------   区域--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_region`;CREATE TABLE `base_region` (    `id` INT,    `region_name` STRING,     PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_region', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s'); -- ---------------------------   区域--   MySQL Sink Load Data-- ------------------------- INSERT INTO base_regionSELECT *FROM ods_base_region;经过上面的步骤,将创建维表所需要的原始数据已经存储到了MySQL中,接下来就需要在MySQL中创建维表,我们使用上面的两张表,创建一张视图:dim_province作为维表:-- ----------------------------------- DIM层,区域维表,-- 在MySQL中创建视图-- ---------------------------------DROP VIEW IF EXISTS dim_province;CREATE VIEW dim_province ASSELECT  bp.id AS province_id,  bp.name AS province_name,  br.id AS region_id,  br.region_name AS region_name,  bp.area_code AS area_codeFROM base_region br      JOIN base_province bp ON br.id= bp.region_id;这样我们所需要的维表:dim_province就创建好了,只需要在维表join时,使用Flink SQL创建JDBC的数据源,就可以使用该维表了。同理,我们使用相同的方法创建商品维表,具体如下:-- ---------------------------  一级类目表--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_category1`;CREATE TABLE `ods_base_category1` (  `id` BIGINT,  `name` STRING)WITH( 'connector' = 'kafka', 'topic' = 'mydw.base_category1', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ---------------------------  一级类目表--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_category1`;CREATE TABLE `base_category1` (    `id` BIGINT,    `name` STRING,     PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_category1', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s'); -- ---------------------------  一级类目表--   MySQL Sink Load Data-- -------------------------  INSERT INTO base_category1SELECT *FROM ods_base_category1; -- ---------------------------  二级类目表--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_category2`;CREATE TABLE `ods_base_category2` (  `id` BIGINT,  `name` STRING,  `category1_id` BIGINT)WITH('connector' = 'kafka', 'topic' = 'mydw.base_category2', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ---------------------------  二级类目表--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_category2`;CREATE TABLE `base_category2` (    `id` BIGINT,    `name` STRING,    `category1_id` BIGINT,     PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_category2', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s'); -- ---------------------------  二级类目表--   MySQL Sink Load Data-- ------------------------- INSERT INTO base_category2SELECT *FROM ods_base_category2; -- --------------------------- 三级类目表--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_category3`;CREATE TABLE `ods_base_category3` (  `id` BIGINT,  `name` STRING,  `category2_id` BIGINT)WITH('connector' = 'kafka', 'topic' = 'mydw.base_category3', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ;  -- ---------------------------  三级类目表--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_category3`;CREATE TABLE `base_category3` (    `id` BIGINT,    `name` STRING,    `category2_id` BIGINT,    PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_category3', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s'); -- ---------------------------  三级类目表--   MySQL Sink Load Data-- ------------------------- INSERT INTO base_category3SELECT *FROM ods_base_category3; -- ---------------------------   商品表--   Kafka Source-- -------------------------  DROP TABLE IF EXISTS `ods_sku_info`;CREATE TABLE `ods_sku_info` (  `id` BIGINT,  `spu_id` BIGINT,  `price` DECIMAL(10,0),  `sku_name` STRING,  `sku_desc` STRING,  `weight` DECIMAL(10,2),  `tm_id` BIGINT,  `category3_id` BIGINT,  `sku_default_img` STRING,  `create_time` TIMESTAMP(0)) WITH( 'connector' = 'kafka', 'topic' = 'mydw.sku_info', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ;  -- ---------------------------   商品表--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `sku_info`;CREATE TABLE `sku_info` (  `id` BIGINT,  `spu_id` BIGINT,  `price` DECIMAL(10,0),  `sku_name` STRING,  `sku_desc` STRING,  `weight` DECIMAL(10,2),  `tm_id` BIGINT,  `category3_id` BIGINT,  `sku_default_img` STRING,  `create_time` TIMESTAMP(0),   PRIMARY KEY (tm_id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'sku_info', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s'); -- ---------------------------   商品--   MySQL Sink Load Data-- ------------------------- INSERT INTO sku_infoSELECT *FROM ods_sku_info;

 

经过上面的步骤,将创建维表所需要的原始数据已经存储到了MySQL中,接下来就需要在MySQL中创建维表,我们使用上面的两张表,创建一张视图:dim_province作为维表:

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
-- ----------------------------------- DIM层,区域维表,-- 在MySQL中创建视图-- ---------------------------------DROP VIEW IF EXISTS dim_province;CREATE VIEW dim_province ASSELECT  bp.id AS province_id,  bp.name AS province_name,  br.id AS region_id,  br.region_name AS region_name,  bp.area_code AS area_codeFROM base_region br      JOIN base_province bp ON br.id= bp.region_id;

 

这样我们所需要的维表:dim_province就创建好了,只需要在维表join时,使用Flink SQL创建JDBC的数据源,就可以使用该维表了。同理,我们使用相同的方法创建商品维表,具体如下:

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
-- ---------------------------  一级类目表--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_category1`;CREATE TABLE `ods_base_category1` (  `id` BIGINT,  `name` STRING)WITH( 'connector' = 'kafka', 'topic' = 'mydw.base_category1', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ---------------------------  一级类目表--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_category1`;CREATE TABLE `base_category1` (    `id` BIGINT,    `name` STRING,     PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_category1', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s'); -- ---------------------------  一级类目表--   MySQL Sink Load Data-- -------------------------  INSERT INTO base_category1SELECT *FROM ods_base_category1; -- ---------------------------  二级类目表--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_category2`;CREATE TABLE `ods_base_category2` (  `id` BIGINT,  `name` STRING,  `category1_id` BIGINT)WITH('connector' = 'kafka', 'topic' = 'mydw.base_category2', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ---------------------------  二级类目表--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_category2`;CREATE TABLE `base_category2` (    `id` BIGINT,    `name` STRING,    `category1_id` BIGINT,     PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_category2', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s'); -- ---------------------------  二级类目表--   MySQL Sink Load Data-- ------------------------- INSERT INTO base_category2SELECT *FROM ods_base_category2; -- --------------------------- 三级类目表--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_category3`;CREATE TABLE `ods_base_category3` (  `id` BIGINT,  `name` STRING,  `category2_id` BIGINT)WITH('connector' = 'kafka', 'topic' = 'mydw.base_category3', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ;  -- ---------------------------  三级类目表--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_category3`;CREATE TABLE `base_category3` (    `id` BIGINT,    `name` STRING,    `category2_id` BIGINT,    PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_category3', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s'); -- ---------------------------  三级类目表--   MySQL Sink Load Data-- ------------------------- INSERT INTO base_category3SELECT *FROM ods_base_category3; -- ---------------------------   商品表--   Kafka Source-- -------------------------  DROP TABLE IF EXISTS `ods_sku_info`;CREATE TABLE `ods_sku_info` (  `id` BIGINT,  `spu_id` BIGINT,  `price` DECIMAL(10,0),  `sku_name` STRING,  `sku_desc` STRING,  `weight` DECIMAL(10,2),  `tm_id` BIGINT,  `category3_id` BIGINT,  `sku_default_img` STRING,  `create_time` TIMESTAMP(0)) WITH( 'connector' = 'kafka', 'topic' = 'mydw.sku_info', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ;  -- ---------------------------   商品表--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `sku_info`;CREATE TABLE `sku_info` (  `id` BIGINT,  `spu_id` BIGINT,  `price` DECIMAL(10,0),  `sku_name` STRING,  `sku_desc` STRING,  `weight` DECIMAL(10,2),  `tm_id` BIGINT,  `category3_id` BIGINT,  `sku_default_img` STRING,  `create_time` TIMESTAMP(0),   PRIMARY KEY (tm_id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'sku_info', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s'); -- ---------------------------   商品--   MySQL Sink Load Data-- ------------------------- INSERT INTO sku_infoSELECT *FROM ods_sku_info;

 

经过上面的步骤,我们可以将创建商品维表的基础数据表同步到MySQL中,同样需要提前创建好对应的数据表。接下来我们使用上面的基础表在mySQL的dim库中创建一张视图:dim_sku_info,用作后续使用的维表。

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
-- ----------------------------------- DIM层,商品维表,-- 在MySQL中创建视图-- ---------------------------------CREATE VIEW dim_sku_info ASSELECT  si.id AS id,  si.sku_name AS sku_name,  si.category3_id AS c3_id,  si.weight AS weight,  si.tm_id AS tm_id,  si.price AS price,  si.spu_id AS spu_id,  c3.name AS c3_name,  c2.id AS c2_id,  c2.name AS c2_name,  c3.id AS c1_id,  c3.name AS c1_nameFROM(  sku_info si   JOIN base_category3 c3 ON si.category3_id = c3.id  JOIN base_category2 c2 ON c3.category2_id =c2.id  JOIN base_category1 c1 ON c2.category1_id = c1.id);

 

至此,我们所需要的维表数据已经准备好了,接下来开始处理DWD层的数据。

 

3)DWD层数据处理

 

经过上面的步骤,我们已经将所用的维表已经准备好了。接下来我们将对ODS的原始数据进行处理,加工成DWD层的明细宽表。具体过程如下:

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
-- ---------------------------   订单详情--   Kafka Source-- -------------------------  DROP TABLE IF EXISTS `ods_order_detail`;CREATE TABLE `ods_order_detail`(  `id` BIGINT,  `order_id` BIGINT,  `sku_id` BIGINT,  `sku_name` STRING,  `img_url` STRING,  `order_price` DECIMAL(10,2),  `sku_num` INT,  `create_time` TIMESTAMP(0)) WITH( 'connector' = 'kafka', 'topic' = 'mydw.order_detail', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ;  -- ---------------------------   订单信息--   Kafka Source-- -------------------------DROP TABLE IF EXISTS `ods_order_info`;CREATE TABLE `ods_order_info` (  `id` BIGINT,  `consignee` STRING,  `consignee_tel` STRING,  `total_amount` DECIMAL(10,2),  `order_status` STRING,  `user_id` BIGINT,  `payment_way` STRING,  `delivery_address` STRING,  `order_comment` STRING,  `out_trade_no` STRING,  `trade_body` STRING,  `create_time` TIMESTAMP(0) ,  `operate_time` TIMESTAMP(0) ,  `expire_time` TIMESTAMP(0) ,  `tracking_no` STRING,  `parent_order_id` BIGINT,  `img_url` STRING,  `province_id` INT) WITH('connector' = 'kafka', 'topic' = 'mydw.order_info', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ;  -- ----------------------------------- DWD层,支付订单明细表dwd_paid_order_detail-- ---------------------------------DROP TABLE IF EXISTS dwd_paid_order_detail;CREATE TABLE dwd_paid_order_detail(  detail_id BIGINT,  order_id BIGINT,  user_id BIGINT,  province_id INT,  sku_id BIGINT,  sku_name STRING,  sku_num INT,  order_price DECIMAL(10,0),  create_time STRING,  pay_time STRING ) WITH (    'connector' = 'kafka',    'topic' = 'dwd_paid_order_detail',    'scan.startup.mode' = 'earliest-offset',    'properties.bootstrap.servers' = 'kms-3:9092',    'format' = 'changelog-json');-- ----------------------------------- DWD层,已支付订单明细表-- 向dwd_paid_order_detail装载数据-- ---------------------------------INSERT INTO dwd_paid_order_detailSELECT  od.id,  oi.id order_id,  oi.user_id,  oi.province_id,  od.sku_id,  od.sku_name,  od.sku_num,  od.order_price,  oi.create_time,  oi.operate_timeFROM    (    SELECT *     FROM ods_order_info    WHERE order_status = '2' -- 已支付    ) oi JOIN    (    SELECT *    FROM ods_order_detail    ) od     ON oi.id = od.order_id;

 

4)ADS层数据

 

经过上面的步骤,我们创建了一张dwd_paid_order_detail明细宽表,并将该表存储在了Kafka中。接下来我们将使用这张明细宽表与维表进行JOIN,得到我们ADS应用层数据。

 

  • ads_province_index

 

首先在MySQL中创建对应的ADS目标表:ads_province_index