一篇文带你快速起步Apache Storm

杜亦舒 2017-07-11 14:39:57

作者介绍

杜亦舒创业中,技术合伙人,喜欢研究分享技术。个人订阅号:性能与架构。

 

本文介绍了 Apache Storm 的基本原理和开发方法,包括一个 PDF 和 2 个示例的源码。

 

内容大纲:

 
  1. Storm是什么

  2. 应用场景

  3. Storm与Hadoop的关系

  4. Storm怎么用

  5. 示例1:统计单词出现的次数

  6. 核心概念

  7. 集群架构

  8. 示例2:统计通话记录

 

1

 

Storm是什么

 

Storm 是一个分布式实时大数据处理系统,可以帮助我们方便地处理海量数据,具有高可靠、高容错、高扩展的特点。

 

Storm 是流式框架,有很高的数据吞吐能力,Strom 本身是无状态的,通过 ZooKeeper 管理分布式集群环境和集群状态。

 

Strom 的安装和使用都很简单,但功能强大,可以并行地对实时数据流进行各种处理。

 

2

 

应用场景

 

应用 Storm 的场景例如:

 

  • 日志处理

 

监控系统中的事件日志,使用 Storm 检查每条日志信息,把符合匹配规则的消息保存到数据库。

 

  • 电商商品推荐

 

后台需要维护每个用户的兴趣点,主要基于用户的历史行为、查询、点击、地理信息等信息获得,其中有很多实时数据,可以使用 Storm 进行处理,在此基础上进行精准的商品推荐和放置广告。

 

3

 

Storm与Hadoop的关系

 

Storm 与 Hadoop 都用来处理大数据,那么它们的关系是怎样的呢?

 

Hadoop 是强大的大数据处理系统,但是在实时计算方面不够擅长;Storm的核心功能就是提供强大的实时处理能力,但没有涉及存储;所以 Storm 与 Hadoop 即不同也互补。

 

它们的最主要的区别例如:

 

  1. Storm 是实时流处理模式,Hadoop 是批处理模式;

  2. Storm 就像一条川流不息的河流,只要不是意外或者人为停止,它就会一直运行,Hadoop 是在需要时执行 MapReduce 任务,执行完成后停止;

  3. 在处理时间上,Storm 每秒可以处理数万条消息,HDFS+MapReduce 处理大量数据时通常需要几分钟到几小时。

 

4

 

Storm 怎么用

 

Storm 非常简洁,为了便于理解,先看 2 个最核心的概念:

 

  1. 数据源头 Spout

  2. 数据处理单元 Bolt

 

先把 Spout 与外部数据进行对接,这样就可以把外面的数据量引到 Storm 中了。Spout 接收到数据后就会发给 Bolt,这就需要告诉 Bolt 如何处理,处理完成后把数据放到哪儿,例如数据库。

 

这就是一个最简单的模型:

 

 

而其中 Bolt 可以有多个,这就使得 Storm 强大而灵活。

 

 

可以看出 Storm 就是一个有拓扑图,点是 Spout 或者 Bolt,边是数据流向。

 

 

所以,使用 Storm 时需要做的就是把整个拓扑图构造出来:

 

  1. 定义数据从哪儿来

  2. 定义数据流向和处理单元的逻辑

  3. 定义数据到哪儿去

 

5

 

示例1 统计单词出现的次数

 

以大数据中的helloworld “词频统计” 为例来学习 Storm 的开发方法。

 

1、实现思路

 

构造拓扑结构:

 

  • Spout 读取一行文本,向下游发送;

  • 把句子分割为单词的 Bolt,从 Spout 发出的数据中取得一行文本,然后分割为一个个的单词,把每个单词向下游发送;

  • 单词计数的 Bolt,接收分词 Bolt 发出的单词,对每个单词出现的次数进行统计,然后把单词及其次数作为一个数据单元向下游发送;

  • 结果输出的 Bolt,接收计数 Bolt 发出的数据单元,取出单词及其次数进行汇总,最后输出统计结果;

  • 创建拓扑对象,把 Spout 和各个 Bolt 连接起来,然后提交到 Storm 中执行。

 

2、代码实现

 

创建项目目录 storm-wordcount

 

项目目录下新建 maven 工程文件 pom.xml

 

 

项目根目录下执行 maven 命令,安装依赖

 

mvn package

mvn dependency:tree

 

项目根目录下创建源码目录 src/main/java

然后在 java 目录下创建包目录 com/storm/test

 

现在项目的目录结构如下:

 

 

具体代码如下:

 

SpoutWords.java

 

 

BoltSplit.java

 

 

BoltCount.java

 

 

BoltReport.java

 

 

WordCountTopology.java

 

 

编译运行

 

 

执行后会输出大量日志信息,之后会循环输出单词统计信息,例如:

 

 

6

 

核心概念

 

Spouts - 数据流的源头

 

Storm 从外部接收数据,例如 Twitter Streaming API、Kafka ……,通过 Spouts 从这些数据源读取数据。

 

Bolts - 逻辑处理单元

 

Spouts 向 Bolts 传递数据,Bolts 接收数据进行处理操作,然后把结果再发射出去。

 

Bolts 的常见操作例如:过滤、聚合、连接、与数据源数据库交互。

 

Tuple - 数据单元

 

Storm 数据流中的数据单元,例如水流中是一滴滴的水滴,Storm 数据流中流淌的就是一个个的 tuple,里面包含着数据。

 

Stream - 数据流

 

是一个无序的 Tuple 序列。

 

Topology - 拓扑

 

Spouts 和 Bolts 连接起来之后形成了 Topology,其中定义了整个应用的实时处理逻辑。

 

Topology 是一个有向图,定点是计算单元,边是数据流。

 

Topology 始于 Spout,Spout 向一个或者多个 Bolt 发射数据,Bolt 拥有处理逻辑,Bolt 的输出可以发射给其它 Bolt 作为它们的输入。Storm 会保持 Topology 一直运行,除非杀掉他。

 

Tasks - 任务

 

一个 Task 就是一个 Spout 的执行,或者一个 Bolt 的执行。

 

Workers - 工作进程

 

Worker 负责实际运行 Task,Topology 运行在一个分布式环境中的多个工作节点上,Storm 会把 Tasks 均匀的分布在所有 Worker 上。

 

每个 Worker 都是一个物理 JVM,执行着 Topology 中所有 Task 的一个子集。

 

Stream Grouping - 流分组

 

简单理解就是控制 Tuple 的路由,定义 Tuple 在 Topology 中如何流动。每个 Spout 或 Bolt 都会在集群中执行多个任务,每个任务都对应为一个线程的执行,Stream Grouping 定义的就是如何从一个 Task 集合 向其他 Task 集合 发送 tuple。

 

现在 Storm 中已经有 8 种分组策略,下面看下其中 4 种常用的:

 

(1)Shuffle Grouping

 

随机分配,可以让每个 Bolt 获得数量均衡的 tuple。

 

 

(2)Field Grouping

 

field 名字相同的 tuples 会被组织在一起,例如,如果根据 "user-id" 这个 field 分组,相同 "user-id" 的 tuples 将总是去向同一个 Task,其他的 tuples 则去向不同的 Task。

 

 

(3)Global Grouping

 

全局统一分组,所有数据流都流向同一个 Bolt。

 

 

(4)All Grouping

 

向每个实例都发送一次,主要用于发送信号。

 

 

回顾一下 wordcount 示例中使用的 grouping 方法

 

 

设置第一个 Bolt bolt-split 时使用的是 shuffleGrouping,因为 Spout 发送一行文字,给谁都行,不关心分组,所以使用 shuffleGrouping 随机即可。

 

设置第二个 Bolt bolt-count 时使用的是 fieldsGrouping,因为 bolt-split 是按单词发射的,所以需要让同一个单词被同一个Task处理,就要使用按字段分组方式。

 

设置第三个 Bolt bolt-report 时使用的是 globalGrouping 统一分组,因为到这儿就要汇总了,需要接收所有的统计结果。

 

7

 

集群架构

 

Storm cluster 中有2种类型节点:master node、worker nodes。

 

master node 中运行着一个守护进程,名为 Nimbus,负责向集群中分布代码、分配任务、监控失败状况。每个 worker node 中运行着一个守护进程,名为 Supervisor,负责接收工作任务,根据 Nimbus 的指令来启动或者停止工作进程。

 

每个工作进程执行一个拓扑的子集,一个拓扑包含多个工作进程,这些工作进程散布在集群里的多台机器中。

 

 

Nimbus 与 Supervisors 的协作是通过 ZooKeeper 集群 完成的,Nimbus 与 Supervisors 都是无状态的,状态信息保存在 ZooKeeper 或者本地磁盘中。

 

Nimbus 与 Supervisors 具有高可靠性,即使通过 kill -9 杀掉他们,也会快速重新启动起来,这使得 Storm Cluster 极其稳定。

 

上图是 Storm Cluster 的全景图,下面我们看一下细节:

 

 

概念总结:

  • Nimbus - Storm Cluster 的主节点,其他节点都是工作节点 worker node,主节点负责任务分配、监控失败等管理工作。

  • Supervisor - 负责接受 Nimbus 分配的任务,一个 Supervisor 会有多个工作进程 worker process,并管理这些工作进程来完成接收到的任务。

  • Worker process - 工作进程,一个工作进程会执行某个特定 Topology 的相关任务,它并不直接自己执行任务,而是创建executors 来执行,一个工作进程可以有多个 executor。

  • Executor - 执行器,就是工作进程创建的一个线程,一个执行器会执行一个或多个任务。

  • Task - 执行实际的数据处理,也就是一个 Spout 或 Bolt。

 

简单总结一下:

  1. 一个 Storm cluster 中有一个Nimbus 和多个Supervisor;

  2. 一个 Supervisor 下有多个 Worker process;

  3. 一个 Worker process 有多个 Executor;

  4. 一个 Executor 下有多个 Task。

 

8

 

示例2: 统计通话记录

 

1、需求描述

 

处理通话记录,统计相同呼叫人和被呼叫人的通话次数、通话总时长。

 

例如通话记录:

 

# 呼叫者号码,接收者号码,持续时长

1234123402, 1234123401,20

...

 

统计结果例如:

 

1234123402-1234123401 : 87,2523

1234123402-1234123404 : 95,2919

...

 

2、实现思路

 

构造拓扑图:

 

  • Spout 读取通话日志,发送给 Bolt,数据包括 from, to, duration

  • Bolt1 接收 Spout 的数据,重新组织数据格式为 from-to, duration

  • Bolt2 接收 Bolt1 的数据,对 from-to 相同的数据进行汇总,累计次数、总时长,在最后输出统计结果

  • Topology 中连接 Spout 与 Bolt1、Bolt2,并提交 Storm 执行

 

3、具体实现

 

(1)创建项目目录

 

在合适的位置创建项目目录 storm-mobile

 

(2)新建 pom.xml

 

项目根目录下新建maven工程文件 pom.xml,内容:

 

 

项目根目录下创建源码目录 src/main/java

然后在 java 目录下创建包目录 com/storm/test

 

现在项目的目录结构如下:

 

 

(3)安装依赖

 

项目根目录下执行maven命令

 

mvn package

mvn dependency:tree

 

(4)代码

 

下面是具体代码:

 

FakeCallLogReaderSpout.java

 

 

CallLogCreatorBolt.java

 

 

CallLogCounterBolt.java

 

 

LogAnalyserStorm.java

 

 

(5)编译运行

 

 

会输出大量的 log 信息,执行完成后,会在底部附近输出程序的执行结果,类似如下信息:

 

 

9

 

小结

 

Storm 开发的基本思路:

  • Spout 中与外部数据源对接,然后发送给内部的 Bolt

    Spout 中的主要方法 nextTuple() 被循环调用,在这里面处理数据的接收、发射

  • Bolt 中定义数据处理逻辑,对接收到的数据进行处理

    其主要方法 execute() 每次收到数据时被调用,在这里定义处理逻辑

  • Topology 中把 Spout 和 Bolt 串起来,定义好上下游关系,然后提交到 storm 执行

 

先把这个最简单的思路理解好,然后在此基础上进行扩充,学习更多的用法就简单了,希望本文可以帮助您快速认识 Storm。

 

教程下载:如需下载本文,登录云盘 http://pan.baidu.com/s/1geHiLY7进行下载。

 

最新评论
访客 2017年07月21日

写得好,打赏!

访客 2017年07月21日

那什么是分割relay日志的正确姿势呢

访客 2017年07月20日

有思想的dba

访客 2017年07月20日

应该在延伸一下,慢查询的监控,告警和优化提示,类似…

访客 2017年07月19日

赞一个

活动预告