Flink替代Spark Streaming怎么样?本篇给你答案

孙赵宏 2019-04-04 09:45:15
作者介绍

孙赵宏,2018年4月加入去哪儿网,后端大数据研发工程师,目前在大住宿事业部/公共技术中心负责用户基础行为数据工程的研发。

 

本文会主要介绍下基于 Flink 构建用户实时基础行为工程的相关实践,包括 Flink 相关的技术点和基础行为实时工程的业务。 

 

Flink 是目前 Qunar 主推的实时数据处理开源平台,用于替代 Spark Streaming。

 

如果你们使用 Flink 也是和我们之前一样,不知道如何使用 Flink 实时计算平台,或者不知道该怎样合理利用其 Features 去更好构建地工程;再或者你想了解每天处理超过12亿条实时数据,数据实时性达到秒级,QPS 可支持10万的用户实时基础行为工程的技术实现,在后面应该能找到答案。

 

一、Flink简介

 

Apache Flink 是一个面向数据流处理和批量数据处理的分布式的开源计算框架,能够支持流处理和批处理两种应用类型。有着低延迟、Exactly-once 保证,而批处理需要支持高吞吐、高效处理的特点。

 

Flink 完全支持流处理,也就是说作为流处理看待时,输入数据流是无界的;批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。这与 Spark streaming 不同,Spark streaming 是将流处理视为无限个有界的批处理(microbatch)。

 

1、Flink 特点

 

  • 有状态计算的 Exactly-once 语义。状态是指 flink 能够维护数据在时序上的聚类和聚合,同时它的 checkpoint 机制可以方便快速的做出失败重试; 

  • 支持带有事件时间(event time)语义的流处理和窗口处理。事件时间的语义使流计算的结果更加精确,尤其在事件到达无序或者延迟的情况下; 

  • 支持高度灵活的窗口(window)操作。支持基于 time、count、session,以及 data-driven 的窗口操作,能很好的对现实环境中的创建的数据进行建模;

  • 轻量的容错处理(fault tolerance)。它使得系统既能保持高的吞吐率又能保证 exactly-once 的一致性。通过轻量的 state snapshots 实现;

  • 支持高吞吐、低延迟、高性能的流处理;

  • 支持 savepoints 机制(一般手动触发),可以将应用的运行状态保存下来;在升级应用或者处理历史数据上,能够做到无状态丢失和最小停机时间;

  • 支持大规模的集群模式,支持 yarn、Mesos。可运行在成千上万的节点上;

  • 支持具有 Backpressure 功能的持续流模型; 

  • Flink 在 JVM 内部实现了自己的内存管理,包括完善的内存架构和 OOM error prevention;

  • 支持迭代计算;

  • 支持程序自动优化:避免特定情况下 Shuffle、排序等昂贵操作,中间结果进行缓存。

 

2、Flink 分布式 runtime

 

 

JobManager 主要工作是协调分布式系统的运行。比如协调各个任务的执行时间,管理 checkpoint 和协调异常状态的恢复等。

 

TaskManager 是任务的真正执行者,包括数据流的缓存和交换等操作。

 

client 不是 Flink Runtime 的一部分,也不参与任务的真正执行,只是用来启动 Job 时生成执行计划并交给 JobManager。

 

二、Flink流式(DataStream)编程模型

 

1、编程抽象层级

 

 

最底层为有状态的流,通过处理函数进入 DadaStream 的 API 处理层。DataStream API 层也叫核心 API 层,一般大部分编程工作都集中于此,包括业务处理、聚合、关联等逻辑操作。

 

Table API 层和 SQL 层其实是可以提供基于 schema 的 SQL 查询形式,目前较少使用。

 

2、流式编程

 

 

Flink 编程模型中三大元素分别是 Source、Operator 和 Sink。Flink 的流起始于 source,经过 Transformation Operator 对流进行处理,最后在 Sink 进行持久化。

 

三、用户实时基础行为工程简介

 

1、用户基础行为工程架构

 

 

首先使用 Flink 订阅各个业务线日志的 Kafka 集群 Topic,实时数据进入 Flink 集群中运行的各个业务线对应的 Job 进行数据清洗,为保证实时性和增加系统吞吐量,直接按照业务线为 Key 存入 Redis 中。

 

Server 端按照 Gid 和 Username 从 Redis 中取出此用户所有业务线的所有行为并与离线数据合并,通过 dubbo 接口返回给客户端。为了减小服务的压力,数据的截取、解压缩等耗 CPU 资源的操作都在客户端进行。

 

2、基础用户行为工程的意义

 

目前我们提供的服务是提供一个用户100天内的实时行为,包括了机票、酒店、火车票、门票、度假、车票、大搜等业务线的搜索、点击、收藏、订单、预定等行为,数据源为 Hotdog 日志,Kylin 日志,业务线日志等。

 

为了补充度假,门票和景点的商品信息,使用了skuvacationinfo,skuticketinfo,skusightinfo 这三个 SKU 库。 

 

基于用户行为,我们可以做精细化留存评估,让留存数据更有价值和指导意义;也可以进行质量评估,需要基于用户行为并且贴合业务去评估。比如某个景点的搜索点击行为在同类中较高,那我们就可以认为此景点为优质景点从而增加此景点的推荐权重等。

 

用户行为也可以用作产品分析,用数据量化产品核心功能,让产品迭代排期更科学,部门配合更高效。

 

用户行为还可以更好实现用户分群、用户分层等精准营销。现在基础用户行为工程已经服务于首页所有的推荐场景。用户实时行为服务已经广泛用于定向广告、首页预制词、单品推荐、目的地推荐等多个场景。

 

我们在个性化推荐场景中进行了测试,使用实时行为服务比使用T-1日行为数据,点击率提升20%。

 

四、DataStream的典型算子(Operator)

使用举例

 

1、使用Filter对流进行过滤

 

某些时候一种日志流中包含了许多种不同行为类型的日志,但业务处理时我们只需要对一种行为类型的日志进行清洗,这时我们可以使用 DataStream 的 Filter 来对数据流进行过滤。

 

 

dataStream.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value != 0;

}

});

 

2、使用Split和select对流进行拆分

 

实际业务中经常会遇到一种日志包含了多种不同的业务或者行为,但我们想将不同的业务分流后分别处理,这就使用到了分流 split 算子。

 

 

SplitStream<Integer> split = someDataStream.split( new OutputSelector<Integer>() { @Override public Iterable<String> select(Integer value) {

        List<String> output = new ArrayList<String>(); if (value % 2 == 0) {

            output.add("even");

        } else {

            output.add("odd");

        } return output;

    }

});

 

分流过后,再使用 select 算子进行拆分:

 

 

SplitStream<Integer> split;

DataStream<Integer> even = split.select("even");

DataStream<Integer> odd = split.select("odd");

DataStream<Integer> all = split.select("even","odd");

 

逻辑非常的清晰,实现非常简单。

 

3、使用 Join 实现双流的聚合

 

当我们要实现两个数据流中的数据关联的时候,我们可能想到使用 Redis 等缓存中间件对中间数据进行缓存,幸运的是 Flink 的算子中已经提供了两条流进行关联的操作 Join。

 

Join 操作和 SQL 的 Join 道理是一样的,需要用 where 指定 Join 的字段和用 equalTo 指定 Join 的条件,最后使用 apply 对 Join 成功的结果进行处理。节约了中间件的资源。

 

 

dataStream.join(otherStream)

    .where(<key selector>).equalTo(<key selector>)

    .window(TumblingEventTimeWindows.of(Time.seconds(3)))

    .apply { ... }

 

五、容错策略

 

1、异常重试策略

 

特殊情况下(如遇上无法解析的数据或者出现未知异常)导致任务执行失败,Flink 会根据自己的 checkpoints 来进行自动重启恢复。 

 

Flink 的 checkpointing 机制会存储状态的一致性快照,配置了不同的状态存储策略,checkpoints 就会保存在不同的地方,比如 JM 的内存,文件系统或是数据库。

 

当前我们设置5秒触发一次 checkpoint 保存,为了节约集群内存资源我们选择保存的位置为 HDFS。

 

重启的策略支持自定义,集群的重启策略可以通过 flink-conf.yaml的restart-strategy 来进行集群级别的控制,也可以在 Job 级别进行设置,分为固定延迟重启策略(Fixed Delay Restart Strategy)和失败率重启策略(Failure rate Restart Strategy):

 

  • 固定延迟重启策略会尝试一个给定的次数来重启 Job,如果超过了最大的重启次数,Job 最终将失败。在连续的两次重启尝试之间,重启策略会等待一个固定的时间。

  • 失败率重启策略在 Job 失败后会重启,但是超过失败率后,Job 会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。 

 

当前我们使用 Job 级别的设置,重启1次,间隔10秒。

 

2、停机恢复策略

 

任务有更新时,Flink 版本升级时,系统升级或系统迁移等情况下,需要停掉 running 状态的 Job,此时如何保证正在处理的数据不会丢失?

 

这就用到了 Flink 的另一个特性 Savepoint。

 

不同于 checkpoint 的自动触发机制,Savepoint 是手动触发的。Savepoint 为全局一致性快照,可以保存数据源 offset,operator 操作状态等信息。 

 

1)停止 job 并保存 savepoit

 

 
./bin/flink cancel -s [savepointDirectory] <jobID>

 

2)启动 job 并开启 savepoint

 

 
./bin/flink run -s <savepointPath> ...

 

3)使用 WEBUI 启动 Job 并输入 savepoint 路径

 

 

3、监控告警

 

Flink 提供了 Metrics,相当于我们的 Qmonitor 来对各项指标进行监控,API 丰富且使用简单。监控的指标可以投射到 Watcher上面。

 

目前我们所有的任务包括了“单位时间接收的数据”、“单位时间处理失败的数据”、“数据从产生到进入 Flink 的延时”、“数据在 Flink 处理的实际时间”、“单位时间持久化成功率”等监控。

 

六、可用性

 

1、HA

 

目前实时处理使用的是公司 Flink 集群,4台 taskmanager,16个 slot。DB 使用的是 mysql-mmm 高可用方式。Redis 使用10个实例的集群。Server 使用8台虚机做的 NG。没有单点问题。

 

2、使用 Flink Backpressure 应对流量洪峰

 

Flink 自带背压感知功能,我们不用手动去缓存过剩的消息,Flink 会自动控制消费速度。其实现方式印证了“最简单的办法往往最有效”这个道理。

 

Flink 使用分布式阻塞队列来作为有界缓冲区。如同 Java 里通用的阻塞队列跟处理线程进行连接一样,一旦队列达到容量上限,一个相对较慢的接受者将拖慢发送者。

 

 

  • 记录“A”进入Flink,然后被Task 1处理;

  • Task 1处理后的结果被序列化进缓冲区;

  • task 2从缓冲区内读取一些数据,缓冲区内将有更多的空间;

  • 如果task 2处理的较慢,task1的缓存区将很快填满。发送速度随之下降。 我们可以通过 WEBUI 来查看 Flink 各个 operator 的背压状态。

 

 

3、使用 EventTime、Window 和 WaterMark 处理无序流量(数据延时)

 

实际处理实时数据的过程中,由于日志收集和传递发送过程中,难免会在时间上乱序,就导致在处理前后有依赖性或者关联性的数据的时候出现问题。Flink 可以使用 EventTime 和 WaterMark 优雅的处理无序流量问题。

 

1)Time

 

在 flink 中元素可以设置3种不同的时间模型:

 

  • Processing time此时间为元素进入 operator 后被赋予的当前算子所在服务器的本地时间戳。简单说就是算子时间。

  • Event time此时间为元素真正的产生时间(例如日志内容中的时间戳),所以通常情况下需要我们从原始的日志内容中提取出来。

  • Ingestion time此时间的赋值是在元素从 source 发出,刚进入 operator 时此时的服务器本地时间戳。

 

 

从上面的简介中可以看出,如果我们想让保证元素的顺序与其最初产生的顺序一致,我们需要使用 EventTime 时间模型。

 

2)Window

 

Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有限大小的缓存区,我们在这些缓存区中对数据进行处理和计算,是一个相对比较好理解的概念。

 

Window 分为滚动窗口、滑动窗口、会话窗口、全局窗口等,可以根据业务需求去选择使用。由于篇幅原因,这里暂时不详细介绍各自的原理和使用。

 

3)Watermark

 

首先介绍一下 watermark 的由来。

 

当我们使用 window 去处理 EventTime 的乱序流时,难免会遇到延迟的元素,但我们又不想无限期的等下去,所以我们想要这么一种方式去告诉 window 停止等待,马上进入计算,watermark 就是做此工作的。 

 

Watermark 是衡量 EventTime 的流进度的一种方式。我们可以把水印视为 flink 插入流中的一个元素,它也拥有一个时间戳,只是这个元素不会像普通元素那样被做逻辑处理。

 

 

以上图为例,W(11)和W(17) 是两个 watermark 分别携带的时间戳为11和17,当算子处理到W(11)时,首先它先识别出这是一个watermark 对象,然后它会知道时间戳小于11的元素已经不会再进入此 window 了,于是触发当前 window 进行计算并将11缓存到自己的状态中。

 

4、编程实现

 

对 EventTime、Window 和 WaterMark 的概念的介绍完后,我们来了解下实践中这三者是如何进行配合来处理乱序流的。

 

1)首先开启 EventTime 时间模型

 

 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

 

我们需要告诉 Flink 我们需要使用 EventTime 的元素时间模型。

 

2)设置 window

 

 
ds.window(SlidingEventTimeWindows.of(Time.minutes(2), Time.minutes(1)))

 

这里我们例子中选用的滑动窗口,根据业务调整窗口的大小和窗口滑动的距离。

 

3、设置 watermark

 

设置 watermark 有两种方式:

 

  • 1)AssignerWithPeriodicWatermarks:定时更新的 watermark。

  • 2)AssignerWithPunctuatedWatermarks:每个元素到来都要设置一个 watermark。虽然后者更精准,但是大数据量的情况下会影响性能,一般使用前者。

 

 

       static AssignerWithPeriodicWatermarks assigner = new AssignerWithPeriodicWatermarks<String>() { 

private final long maxTimeLag = 60000;//60 secs @Nullable  @Override 

public Watermark getCurrentWatermark() { //设置允许延后60秒  return new Watermark(System.currentTimeMillis() - maxTimeLag);

       } 

@Override

 public long extractTimestamp(String logStr, long l) {

            JsonObject jsonObject =

                 (JsonObject) new JsonParser().parse(logStr).getAsJsonObject(); return jsonObject.get("timestamp").getAsLong();

        }

    };

}

 

重写 getCurrentWatermark()来设置生成 watermark 的方式。重写 extractTimestamp()来提取元素的 eventtime。在WEBUI 上可以实时查看 watermark 状态。

 

 

4、性能扩展

 

当业务扩展时,只需要申请新的 taskmanager,扩容 Redis 实例,申请虚机,然后提交新的 Job 即可,水平扩展非常简单方便,完全不影响在运行的业务。

 

作者:孙赵宏

来源:Qunar技术沙龙订阅号(ID:QunarTL)

dbaplus社群欢迎广大技术人员投稿,投稿邮箱:editor@dbaplus.cn

最新评论
访客 2023年08月20日

230721

访客 2023年08月16日

1、导入Mongo Monitor监控工具表结构(mongo_monitor…

访客 2023年08月04日

上面提到: 在问题描述的架构图中我们可以看到,Click…

访客 2023年07月19日

PMM不香吗?

访客 2023年06月20日

如今看都很棒

活动预告