分布式实时处理系统在高性能计算场景下的应用

卢誉声 2016-11-28 10:16:53

 

本文根据DBAplus社群第82期线上分享整理而成

 

讲师介绍
 

卢誉声

Autodesk资深系统研发工程师

 

  • 《分布式实时处理系统:原理、架构与实现》作者,Hurricane实时处理系统主要贡献者,多部C++领域译作。

 

大家好,我们今天主要讨论以下几个问题:

  1. 机器学习与实时处理系统应用

  2. 分布式计算拓扑搭建

  3. 消息算法调优

  4. Hurricane计算框架与未来展望

 

一、机器学习与实时处理系统应用

 

现在我们先来看看第一部分:机器学习与实时处理系统应用。我们首先简单了解下机器学习,然后引入分布式实时处理系统的概念以及实时处理系统与机器学的关系。

 

机器学习在现实世界中的作用越来越重要。

 

机器学习的方法非常多,比如传统的知识库方法,类比方法,归纳方法,演绎方法等各种方法。

 

目前在大多数领域中应用最多的当属归纳学习方法。

 

在通常的归纳型机器学习中,我们的目标是让计算机学习到一个“模型”(这种模型是人类预先组织好的,有固定的数据结构和算法等等),然后我们就可以用这个“模型”来进行“预测”。 预测就是从现实中输入一些数据,通过学习到的模型进行计算,得到的输出。我们希望这个模型可以在很高的概率下输出一个和真实结果差距不大的结果。

 

一旦我们得到了这个模型,我们可以使用该模型处理输入数据,得到输出数据(即预测结果),而归纳性机器学习的任务就是学习中间的这个模型。

 

如果我们将这个模型看成一个函数,那么我们可以认为归纳性机器学习的目的就是学习得到一个函数F,如果该函数的参数为x,输出为y。那么我们希望学到的东西就是 y = F(x) 中的F。

 

我们先用一个最简单的例子来讲一下:

 

假设我们现在不知道一个物体自由落体速度的计算公式,需要学习如何预测一个物体的自由落体速度 ,机器学习的第一步就是收集数据 。

 

假设我们可以测量出物体下坠的任何时间点的速度,那么我们需要收集的数据就是某个物体的下坠时间和那个时间点的速度 。

 

现在我们收集到一系列数据:

时间    物体速度

1       9.7

2       20.0

3       29.0

4       39.9

5       49.4

6       58.5

7       69.0

8       78.8

9       89.0

 

我们这里给出两个假设。第一个假设是,一个物体自由落体的速度只和时间有关系 第二个假设是,我们可以使用一个简单的“模型”:一元一次函数得到物体的速度。(即 F(x) = ax + b)

 

在这个问题中,a、b 这就是这个模型待学习的“参数”。

 

现在的问题就是——我们需要用什么策略来学习这些参数?因为我们可以遍历的数值空间是无穷大的,因此我们必须采用某种策略指导我们进行学习。

我们就用非常朴素的思想来将解决这个问题吧。

 

在正式学习前,我们先将收集的数据分成两组:一组是“训练数据”,一组是“测试数据”。

假设训练数据是:

1       9.7

2       20.0

3       29.0

4       39.9

5       49.4

6       58.5

测试数据是:

7       69.0

8       78.8

9       89.0

 

我们需要根据训练数据计算出我们的参数a和b。然后使用我们计算出来的a和b预测测试数据,比较F(x)和实际数据的差距 。

 

如果误差小到一定程度,说明我们学习到的参数是正确的。

 

比如和实际数据的差距都小于5% 。

 

如果满足条件说明参数正确,否则说明参数不够精确,需要进一步学习,这个差距,我们称之为误差(Loss) 。

 

现在我们来看一下在这个模型(简单的一元一次线性函数)下如何学习这两个参数:

 

比如我们可以采用这种学习策略 1.首先a和b都假定为整数,假定a的范围是[-10, 10]这个区间,b的范围是[-100, 100]这个区间 2.遍历所有的a和b的组合,使用a和b计算ax + b,x取每个训练数据的输入数据,评估计算结果精确性的方法是计算结果和训练数据结果的差的绝对值除以训练数据结果,也就是 Loss = |F(x) - Y| / Y 3.计算每个组合的Loss的平均值,取平均Loss最小的为我们假定的“学习结果” 。

 

现在我们就得到了a和b,并且这个a和b是在我们给定范围里精度最高的参数,我们用这个a和b去训练数据里面计算平均的 |F(x) - Y| / Y 。如果平均Loss小于 5%,说明这个a和b是符合我们精度的, 否则我们需要优化我们的学习策略。

 

这种朴素的基于归纳学习的机器学习方法可以分为以下几步:

  1. 预先定义一个模型

  2. 根据模型制定学习策略

  3. 使用学习策略使用模型来学习(拟合)训练数据,得到该模型中的所有参数

  4. 使用测试数据评估模型是否精确。如果不够精确则根据学习策略继续学习。如果足够精确,我们就认为机器学习结束了。

  5. 最后我们可以得到模型和参数,这就是我们学到的结果,也就是那个用来预测的函数。

 

 这里我们也要注意,上述步骤的前提是我们的模型是可以收敛的,如果模型本身就是发散的,那么我们就永远得不到我们的结果了。

 

机器学习与实时处理系统

 

传统的机器学习是一种批处理式的方法,在这种方法下,我们需要预先准备好所有的训练数据,对训练数据进行精心组织和筛选,很多情况下还需要对数据进行标记(监督式学习),而训练数据的组织会对最后的训练结果产生相当大的影响。

 

在这种算法中我们要处理完所有数据后才能更新权重和模型。

 

但现在出现了许多在线学习算法,这种算法可以对实时输入的数据进行计算,马上完成权重和模型更新。

 

一方面我们可以用于监督式学习(完成数据标记后马上加入训练),也可以用于大量数据的非监督式学习。

 

而在这种情况下,实时处理系统就可以大展身手了。在线系统和实时处理系统可以确保实时完成对数据的学习,利用实时新系统。

 

实现思路如下图所示:

这里我们可以看到,系统接收来自其他系统的实时输入,然后实时处理系统中使用在线算法快速处理数据,实时地更新模型权重信息。

 

纯粹的在线算法可能并不适合许多情景,但是如果将部分在线算法和传统的批处理式算法结合,将会起到非常好的效果。而且许多数据分析工作确实可以通过这种方式完成一部分处理,至少是预处理。

 

目前机器学习的趋势就是对精度和速度的要求越来越高,方法越来越复杂,而数据越来越多,计算量越来越大,如果没有足够的计算结果,不一定能够在有限时间内完成足够的学习,因此现在类似于Tensorflow之类的机器学习解决方案都会提供针对分布式的支持。而大数据场景下的机器学习也变得越来越重要,这也对我们的分布式计算与存储方案提出了严峻的挑战。

 

二、分布式计算拓扑搭建

 

现在我们来看一个现实工程中常常会遇到的问题。

 

我们在开发实际系统时常常会收集大量的用户体验信息,而我们常常需要对这些体验信息进行筛选、处理和分析。那么我们应该如何搭建一个用于实时处理体验信息的分布式系统呢?

 

我们先来看一下整体流程:

 

 

  • 收集体验信息
    业务系统调用体验信息接口,将体验信息信息异步写入到特定的文件当中。使用永不停息的体验信息检测程序不断将新生成的体验信息发送到数据处理服务器。

  • 处理体验信息

    首先数据处理服务器的体验信息接收负责将体验信息写入本地的Redis数据库中。然后我们使用消息源从Redis中读取数据,再将数据发送到之后的消息处理单元,由不同的数据处理单元对体验信息进行不同处理。

  • 存储结果

    消息处理单元完成体验信息处理之后,将体验信息处理结果写入到Cassandra数据库中,并将体验信息数据写入到Elasticsearch数据库中。

 

其中关键的部分就是图中用长方形框出来的部分,该部分的作用是完成对数据的筛选、处理和基本分析。这部分我们将其称作计算拓扑,也就是用于完成实际计算的部分。

 

我们接下来阐述一下每一步具体如何做。

 

收集体验信息

 

收集体验信息分为以下几步:

  • 程序通过体验信息接口将体验信息写入体验信息文件中。我们假设程序会使用非阻塞的异步写入接口,体验信息接口的调用方只是将体验信息送入某个队列中,然后继续向下执行。

  • 接着体验信息写入线程从消息队列中读取数据,并将体验信息数据写入到真正的体验信息文件中。

  • 写入后,某一个体验信息代理程序会不断监视体验信息文件的改动,并将用户新写入的体验信息信息发送到体验信息处理服务器的体验信息收集服务接口上。

  • 体验信息收集服务接口是整个服务的对外接口,负责将其他节点发送的体验信息信息送入集群内部的Redis节点,并将体验信息数据写入到Redis的列表中。至此为止,体验信息收集过程就完成了。

 

处理体验信息

 

接下来是处理体验信息,处理体验信息主要在计算拓扑中完成。分为四步:

  • 体验信息处理消息源:负责监视Redis列表的改变,从Redis列表中读取体验信息规则,并将体验信息规则文本转换成计算拓扑的内部数据格式,传送到下一个体验信息处理单元。

  • 体验信息规则引擎:使用体验信息规则引擎对体验信息进行处理和过滤。这一步是可选的,也就是用户可以加入自己的消息处理单元对收集的体验信息进行处理。这将会影响到发送到后续的消息处理单元(索引器和计数器)中的体验信息消息。这一步我们就不做处理了,如果读者感兴趣可以自己加入一个或者多个消息处理单元对体验信息进行处理。

  • 索引:这一步必不可少,用于将体验信息规则引擎输出的体验信息写入到ElasticSearch中,并便于用户日后检索这些体验信息。这里涉及到一步——将体验信息规则元组转换成JSON,并将JSON写入ElasticSearch。

  • 统计:这一步也非常重要,用于对体验信息进行计数,这一步会将体验信息计数结果写入Cassandra的对应表中。便于用户获取统计信息。

 

存储结果

 

最后就是对计算结果的存储,我们需要使用存储模块将数据写入到不同的数据库中:

  • ElasticSearch:该数据库用于存储被转换成JSON的原始体验信息信息。用户可以在ElasticSerach中检索体验信息。

  • Cassandra:该数据库用于存储体验信息的统计计算结果。因为Cassandra支持原子计数列,因此可以非常胜任这个工作。

 

我们可以发现,在上面几步中,其他都可以使用现成的系统来完成任务,最关键的部分就是计算拓扑,计算拓扑需要高实时性地完成体验信息处理分析任务,这样才能应付大型系统中以极快速度产生的大量体验信息。

 

这里我们可以使用一个独立的计算集群来完成这个事情。每个计算节点负责完成一个计算任务,完成之后将数据传送给下一个计算节点完成后续的计算任务。每个计算节点都有一个消息队列用于接收来自上一级的消息,然后处理消息并继续将结果发送给下一级的计算节点。

 

这里我们通常关心三个问题:

  1. 如何确保所有数据都得到了处理。

  2. 如何组织消息(数据)的传递,为整个集群高效计算提供一个良好的I/O支持。

  3. 如何搭建这个计算拓扑并尽量高效地进行完成计算。

 

三、消息算法调优

 

1、如何确保所有数据都得到了处理

 

我们先来看一下如何解决解决数据的完全处理问题。

 

我们这里讲每一个需要处理的数据(一条体验信息记录)组织成一个Tuple,也就是元组。每个计算节点都以Tuple为单位进行数据处理。每个元组都会有一个ack方法,用于告知上一级计算节点该Tuple已经处理完成。

 

我们以下面的方式处理Tuple,保证所有数据都会被完全处理:

  1. 首先给每个Tuple一个id(伪随机的64位id)。

  2. 由消息源发出的Tuple会有一个Acker,构造Tuple的时候会把新的Tuple加入这个Acker(就是包含这个Acker)。

  3. 每个节点处理完一个元组调用元组的ack方法,改变Acker内部的记录值,表示当前Tuple已经完成处理。

  4. 如果某个Acker中的所有Tuple都已经处理完成,那么这个Spout Tuple就已经处理完成。表明该消息源发出的Tuple被完全处理。

  5. 由于我们无法在Acker中记录下Tuple树,因此比较好的方式是实现一个基于异或的优化算法,该算法在Storm中得到了应用。其具体实现是:在Acker中设置一个ack id,每创建一个Tuple,将id与其异或,每ack一个Tuple时,将其与id做异或运算。这样当所有Tuple处理完成后,ack id为0,就可以知道所有元组处理完成。

  6. 如果消息源检测到某个其发出的Tuple没有在特定时间内得到处理,就会重发该元组。后续的计算节点重新开始处理。为了实现一个同时符合CAP的分布式系统,我们这里后续的计算节点并不会缓存计算结果,而是会重新开始计算上一级节点重发的元组,具体为什么这样做请参见How to beat the CAP theorem。

 

2、数据流量控制问题

 

我们可以设想一下,如果网络状况不好,在特定时间内有许多元组都没有得到处理,那么数据源节点就会重发许多Tuple,然后后续节点继续进行处理,产生更多的Tuple,加上我们需要正常处理的Tuple,使得集群中的Tuple越来越多。而由于网络状况不好,节点计算速度优先,会导致集群中积累的过多数据拖慢整个集群的计算速度,进一步导致更多的Tuple可能计算失败。

 

为了解决这个问题,我们必须想方设法控制集群中的流量。

 

这个时候我们就会采用一种流量背压机制。该机制借鉴自Heron。

 

这个机制的思想其实很简单,当每个计算节点处理 Tuple过慢,导致消息队列中挤压的Tuple过多时会向其他节点发送消息,那么所有向该节点发送消息的节点都会降低其发送消息的速度。经过逐级传播慢慢将整个集群的流量控制在比较合理的情况下。只不过这个算法具体如何实现有待我们继续研究。

 

3、如何搭建这个计算拓扑,尽量高效地进行完成计算

 

最后就是如何搭建这个拓扑,并尽量高效地完成计算了。

 

在分布式实时处理系统领域,目前最为成功的例子就是Apache Storm项目,而Apache Storm采用的就是一种流模型。而我们的Hurricane则借鉴了Storm的结构,并进行了简化(主要在任务和线程模型上)。

 

这种流模型包括以下几个概念:

  1. 拓扑结构:一个拓扑结构代表一个打包好的实时应用程序,相当于Hadoop中的一个MapReduce任务。但是和MapReduce最大的不同就是,MapReduce最后会停止,相当于任务处理结束,而拓扑结构则会持续执行,永不停息,除非你手动停止。因此任何时刻流入的数据流都会被拓扑结构迅速处理。

  2. 流:一个流是拓扑结构中由元组组成的无限的序列,通常是由一个元组经过不同的处理单元处理之后产生的。每一个流入拓扑结构中的数据都会产生一个流。

  3. 元组:元组是在流中传输的数据,数据源会将输入的数据转成元组输入到拓扑结构中,而数据处理单元会处理上一级的元组并产生新的元组传给下一级的数据处理单元。元组中支持存储不同类型的数据。

  4. 消息源:消息源是拓扑结构中数据流的源头。通常其任务是读取外部数据源输入,并产生元组输入拓扑结构中。可靠的数据源可以确保消息完全得到处理,并在合适的时候重发元组。

  5. 数据处理单元:数据处理单元是拓扑结构中负责处理数据的部分,你可以在其中筛选数据,统计数据,拼接数据等等。

  6. 数据处理单元会接收来自上一级的元组,并经过处理得到下一级的元组。每个数据处理单元会向上一级确认其元组有没有得到正确处理,如果数据源发现固定时间内并不是全部元组都被处理完了,就会重发元组。

 

为了支撑这套模型,我们设计了Hurricane的架构,该架构如下图所示:

 

其中有以下几个组件:

  1. President:该组件是一个服务,是整个集群的核心,负责完成整个集群的调度和管理。当你需要启动一个任务时,该节点会读取整个进去的信息,并将任务合理分配给各个计算节点。

  2. Mananger:每个计算节点都有一个Mananger服务。该服务负责接收来自President的消息,并将任务交给具体的Executor进行处理。当处理完成一个Tuple后会将Tuple发给下一个Manager进行处理(发送给那个Manager会在计算任务启动时由President指定)。

  3. Executor:每个Executor是一个线程,该线程会启动一个消息循环,接收来自Manager的消息,每接收到一个消息,就会调用Executor内的Task完成处理。每个Executor会包含一个Task,也就是一个计算处理任务。

  4. Task:计算处理任务,可能是产生Tuple的消息源,也可以能是对Tuple进行处理的消息处理单元,每个Executor都会包含一个Task。而Storm中,一个Executor中会包含多个Task,我们模仿JStorm改造了这个模型,主要可以简化Task的管理和任务调度,而在JStorm中也证实这样并不会降低集群的处理能力。

 

四、展望

 

目前我们已经基本实现了这个架构并且能保证处理简单的计算任务。我们需要在之后的时间中继续完善这个架构和机制,完善并优化我们的系统实现,比如完全实现高层抽象Squared和保序机制等,让我们的系统能更接近切合实际的工程应用,而不是一个设想的空中楼阁。

 

除此以外,由于现在有许多计算任务需要使用基于向量和矩阵的浮点计算,因此我们计划开发一个Hurricane的子项目——SewedBLAS。这是一个BLAS库的高层抽象,我们希望整合大量的BLAS库,比如使用CPU的MKL/OpenBLAS,使用GPU的CUDA和ACML,构建一个易于使用、跨平台的高性能线性代数库,并与Hurricane进行深度整合,力求在分布式和科学计算、深度学习找到最好的切合点,并充分吸收整合其他现有的分布式机器学习框架,减少从科研到产品的转换难度。

 

目前该框架可以在 http://github.com/samblg/hurricane 找到。欢迎大家一起讨论。

最新评论
访客 2024年04月08日

如果字段的最大可能长度超过255字节,那么长度值可能…

访客 2024年03月04日

只能说作者太用心了,优秀

访客 2024年02月23日

感谢详解

访客 2024年02月20日

一般干个7-8年(即30岁左右),能做到年入40w-50w;有…

访客 2023年08月20日

230721

活动预告