京东商城基础平台团队:包括大规模容器集群调度、数据库与存储技术、消息系统与服务框架、架构与运维、机器学习与人工智能等技术方向。由京东商城首席架构师刘海锋担任部门负责人。基础平台运营多个数据中心数万台服务器,支撑京东无数在线业务(团队公众号ID:ipdchat)。
导读:本文将简单介绍京东消息中间件的演进历程,以及作为消息中间件,在每一代产品中我们是如何解决MQ面临的一些通用问题,如:如何处理IO,消息如何存储,消息如何路由等等。
我们在开始之前要明确一些基本概念:
Broker:消息中间件服务端的一个实例;
Producer:消息的发送方;
Consumer:消息的接收方;
Topic:消息中间里的数据分类的标示,一个topic也就代表一类消息,producer和consumer通过Topic实现关联。
第一代AMQ
2012年初,京东唯一一个类似消息中间件的产品还是一个基于数据开发的,消息分发系统。所以确切地说,当时京东还没有一个统一的消息中间件平台,但随着公司组织架构的升级,建设一个统一的消息中间件平台就提上了日程。
目标就是建立一消息平台对外提供高可用,可扩展的消息传递服务,并能消息进行有效的治理。
第一代选型:
消息核心:基于ActiveMQ5.6做扩展。
配置中心:MySQL + Zookeeper。
管理监控平台:自主研发。提供比ActiveMQ自带的管理平台更丰富的消息监控功能,并提供消息管理服务
当时选择ActiveMQ是由于它是由JAVA开发符合公司的技术路线(.NET转JAVA),有主从复制方案,性能尚可(单实例TPS 3000+),支持JMS、AMQP等多种技术规范。至于为何选择Zookeeper,可参考我厂另一篇文章《服务框架技术选型实践》中的“注册中心选型”。
AMQ是如何解决MQ面临的一些通用问题:
因为核心是使用的ActiveMQ,所以部分问题就回归到了ActiveMQ是如何解决这些通用问题上来,但其中不乏我们对ActiveMQ的扩展,关于ActiveMQ部分的详细情况我们可以参看其官方文档,以下我们主要看下我们在其基础上的扩展及改进:
1.如何存储?
ActiveMQ使用了KahaDB存储引擎进行存储,BTree索引,为了保证可靠性索引文件和日志文件都要同步刷盘。此外,ActiveMQ通过虚拟主题(VirtualTopic)的方式实现Topic,也就是如果一个Topic有多个订阅者,ActiveMQ就为每个订阅者创建一个队列,那么producer每发一条消息,有多少个订阅者,broker就会将消息复制多少份,将其放到不同订阅者的队列中,broker在从中获取消息推送给consumer。
2. 如何支持集群?
当时的原生的ActiveMQ客户端在收发消息上是这样的:针对一个Topic发送者就只能往特定的单个Broker上发送消息,消费者亦是如此,只能从指定的单个Broker上消费消息。从客户端的角度来看就是不支持服务集群化,一旦分给这个Topic的这组Broker挂掉那针对这个Topic来说整个服务就处于不可用状态。
我们做的第一件事就是让客户端支持集群,我们采用ZK作为配置中心对客户端进行了扩展,使客户端支持了集群的同时还实现了其对服务端动态扩展的支持。以下是集群化之后的客户端与服务端整体架构。
图1:客户端服务端整体架构
3. Push or pull?消息如何路由?
AMQ采用的是push模式,也就是消息由producer发送到broker端之后是由broker推送给consumer。
对于原生的客户端只能在单个Broker上收发消息,那也就不存在消息路由的问题,我们来看下集群化之后消息是如何路由的,这里的路由涉及到两个方面,一方面是producer在发消息是如何选择将消息发往哪个Broker,另一方面就是broker如何决定一条消息应该推送给哪个consumer。
发送路由:
随机发送,这种模式是完全随机的,也就是producer在topic指定的多个broker当中随机选择一个broker向其发消息。
加权随机发送,这种方式下我们可以对一个topic分配的多个broker设置不同的权重,发送时producer就根据权重来进行选择,权重越高被选中的几率越大。这种方式有个好处就是当一个broker有异常时我们就可以将其权重置为0,这样producer就不会往上发消息,待其恢复之后我们再逐步的将权重给予恢复,这就能够避免在单个broker出异常时客户端大量报错。
消费路由:broker随机选择一个当前在线的consumer并将消息推送给它。
图2:客户端路由模型
4. 如何处理消费失败的消息?
原生ActiveMQ的consumer在成功处理一条消息之后会向broker发一个ACK,以确认消息被成功消费。当处理一条消息失败之后就会向broker返回一个消息处理失败的应答,此时当broker收到这条处理失败的应答时会将这条处理失败的消息放到“死信队列”(DLQ)。
针对每个普通队列Broker端都对应着这样一个死信队列,此队列的消息不能被消费者所获取。对此我们对客户端进行了扩展,在消费出错的时候拦截错误信息,然后将出错的消息通过SAF协议(我厂的SOA框架)调用“重试服务”将异常消息入库,入库成功之后consumer再向broker发送一个消费成功的ACK,这时Broker就会将此消息删除,这样我们实际上就是将broker端的消息转移到了库里。而后consumer再根据一定策略通过SAF去调用“重试服务”获取之前入库的消息进行处理。
图3:客户端重试架构图
5. 如何生成消息轨迹?
原生ActiveMQ并不支持消息轨迹,我们扩展了服务端,对写消息和消费消息进行了拦截,在消息入队成功和消费成功后分别记下日志,再由Agent采集日志并进行归档,Agent将元信息写入归档库,然后将消息内容写入JFS(JD File System,我厂的分布式文件系统)。归档成功后可在管理控制平台追踪消息的处理轨迹,必要时还可下载消息体。
6. 其他扩展、优化
优化了broker写消息逻辑将性能提升到单实例TPS 4000+(原生单实例TPS3000+);
实现了新的主从复制(原生主从复制为完全的同步复制,在slave落后时需要手动将Master数据拷贝到Slave,运维极其不方便)
实现新的主从选举使其更易维护,更好的支持集群(原生的主从选举容易出现双Master);
增加了监控告警模块以及其他的一些运维工具,使得管理里更加简单,运维更加方便。
经过一系列的扩展 和优化,AMQ基本上实现了高可用、可扩展以及统一的治理消息等目标,初步形成了一套完整的企业级消息中间件解决方案。以下是AMQ平台粗略的整体架构图:
图4:AMQ整体架构
第二代JMQ
到2014年初AMQ几经优化已经很成熟。但随着公司业务的发展,接入主题数量越来越多,消息量也是成倍增长。
此外随着公司在各地新建机房,应用的部署结构也变得比较复杂,这就有了跨机房部署等需求。加上AMQ本身的一些问题,归结一下主要有:
Broker较重,采用B-Tree索引,性能随消息积压量的上升急剧下降;
Broker端采用的VirtualTopic模式针对一个Topic有多个订阅者的情况会对每个订阅者单独存储一份消息。而京东的生产环境中大部分都是采用VirtualTopic并且每个Topic订阅者都很多,举个例子,比如“订单管道”消息:它有将近100个订阅者,也就是同一个数据要写将近100份,不仅如此,这100份消息还要通过网络发送到Slave上,经过这些流程,写入TPS只能达到几百。所以不管本地写性能性能、网落利用率、还是存储空间利用率来看这种方案都急需调整;
Broker逻辑复杂,其模型就决定了无法在其基础上扩展消息回放、顺序消息、广播消息等个性化需求,而实际使用过程当中又比较渴望我们支持此类特性;
重客户端,由于集群、异常消息重试等功能都是通过扩展ActiveMQ的原生客户端并引入SAF、ZooKeeper等服务得以支持的,一定程度上增加了客户端的复杂度,相应的在客户端的稳定性、可维护性等方面就打了折扣;
注册中心直接暴露给了客户端,这样最明显的一个缺点就是随着客户端实例数的增多,注册中心的连接数越来越多,这就很难对注册中心实施保护措施;
监控数据不完善。
基于以上原因我们2014年初开启了第二代消息中间件JMQ的自研过程。主要做了以下一些工作:
a. JMQ服务端:实现轻量级的存储模型、支持消息回放、支持重试、支持消息轨迹、支持顺序消息、支持广播消息等,并兼容AMQ客户端使用的OpenWire协议;
b. JMQ客户端:实现轻量级客户端只和Broker通信,支持动态接收参数,内置性能采集、支持跨机房;
c. 管理控制平台:管理监控功能更强大;
d. HTTP代理:基于Netty,支持跨语言。
回到JMQ是如何解决MQ面临一些通用问题上来:
1. 如何解决IO问题?
JMQ没有采用AMQ通过自己开发重复造轮子的方式解决IO问题,而是使用了Netty4.0,此框架开源,支持epoll,编程模型相对简单。这在一定程度上减少了服务端的开发工作,也降低了服务端的复杂度。
在应用层,我们自定义了JMQ协议,序列化和反序列化也完全自己开发。这种序列化反序列化方式虽然在一定程度上降低我们的开发效率,但我们不用考虑如果采用第三方的序列化和反序列化方案会带来的性能损耗问题,对于性能上的提升是显而易见的。
2. 如何存储消息?
JMQ存储分为日志文件(journal)和消息队列文件(queue)以及消费位置文件(offset)都存储在Broker所在机器的本地磁盘上。
日志文件,主要存储消息内容,包括消息所在队列文件的位置,有以下特点:
a. 同一broker上不同topic消息存储在同一日志文件上,日志文件按固定大小切分;
b. 文件名为起始全局偏移量;
c. 消息顺序追加。
d. 日志文件同步刷盘;
由于JMQ主要使用在可靠性要求极高的下单、支付等环节,所以broker必须保证收到的每条消息都落到物理磁盘,这样一种日志文件设计主要是为了提高多topic大并发下磁盘的写性能。不仅限于模型的设计,为了提高写性能我们在逻辑上还实现了Group commit。下图是JMQ中Group Commit基本示意图:
图5:Group commit 示意图
消息队列,主要存储消息所在日志文件的全局偏移量,此文件有以下特点:
a. 同一broker上不同topic的队列信息存储在不同的队列文件上,队列文件按固定大小切分;
b. 文件名为全局偏移量;
c. 索引顺序追加;
d. 队列文件异步刷盘。
由于在日志的写入是单线程的,那我们在写入之前就可以提前获取到消息在队列文件的位置,并将这个位置写入到在日志文件当中。所以只要日志文件写成功,即便队列文件写失败,我们也能从日志文件中将队列恢复出来,因此队列文件采用异步刷盘。
图6:日志文件和队列文件模型
消费位置文件:主要用于存储不同订阅者针对于某个topic所消费到的队列的一个偏移量。
下图简单描述了消息在服务端的流转过程:
图7:消息流转示意图
3. 如何容灾?
说到容灾,我们上一节谈到了每条消息在broker上都会落地,但这远远不够,我们必须要保证单机失效甚至机房失效的情况下数据还能被恢复,所以我们需要的一套完整的方案来进行数据灾备。在AMQ里采用的是一主一从,主从分布在一个数据中心,主从同步复制这样的方案。
JMQ里我们实现了一套全新的复制方案:采用一主一从,至少一个备份,主从分布在同一个数据中心、备份分布在其他数据中心,主从同步复制、备份异步复制这样一种方式进行数据灾备。主从同步复制这样就保证了同一条消息我们至少有两个完整备份,即便一个备份丢失,我们还有另一个备份可用。备份节点就保证了极端情况下即使整个数据中心挂掉,我们绝大部分的消息还能得以恢复;
图8:JMQ复制模型
4. Push or pull?消息如何路由?
JMQ采用pull模式,也就是消息由producer发送到broker之后是由consumer主动发起请求去broker上取消息。
发送者的路由和AMQ客户端采取的策略基本相同。
消费者路由和AMQ差不多也是随机,不同点在于JMQ是拉模式,在Broker采取长轮询策略。
5. 如何处理消费失败的消息?
与AMQ不同,由于JMQ 的broker直接就支持重试,在Consumer在处理消息失败时直接向服务端发送一个重试消息命令,服务端接到到命令后将此消息入库。随后在consumer发起拉取消息命令时,服务端再根据一定的策略从库里将消息取出,返回给consumer进行处理。这样做带来一个好处就是JMQ客户端减少了一个第三方服务依赖。
6. 如何记录消息轨迹?
JMQ消息轨迹功能整体流程和AMQ基本相同,主要不同点在于JMQ将消息轨迹相关信息存储到HBase,这样JMQ消息轨迹信息的存储周期变的更长,可以存储的量也更大了,并且采用多种手段优化之后性能也得到了极大的提升。
7. 如何管理元数据?
通过第一部分我们知道,AMQ的元数据会持久化在MySQL,并在入库的同时写入Zookeeper,由ZK再下发到Broker和客户端。这样就有两个问题:
a. 客户端引入了Zookeeper这个第三方服务,引入服务越多那客户端稳定性和可维护性就越差。
b. 注册中心也就是Zookeeper直接暴露给了客户端,这样就会导致注册中心的连接数越来越多在出现故障时缺乏必要的手段对注册中心进行保护。
在JMQ中我们依然利用MySQL来持久化元数据,同时也会将元数据写入Zookeeper,Zookeeper再通知到Broker,但客户端不再直接连接ZooKeeper,而是转而连接Broker,从Broker上获取元数据信息。
由于每个Broker都有全量的元数据信息,所以客户端端连接任意的Broker都能获取到元数据信息。这种设计就带来了几个好处:
a. 减少了客户端对Zookeeper服务的依赖,至此我们客户端就只需和broker通信,客户逻辑得到了简化,客户端稳定得到了极大提升。
b. Zookeeper不再暴露给客户端,这样ZooKeeper的稳定性也有了保证。
c. 由于连接任意一个Broker都能获取到元数据,极端情况下即便有个别broker宕机也不影响客户端获取元数据,所以从另外一个角度来看这又提高了我们注册中心的可用性。
8. 其他
除了以上提到的一些基本问题之外,我们还解决了很多问题,由于篇幅问题就不一一在此罗列说明,其中包括但不限于:
a. 如何实现严格顺序消息;
b. 如何实现广播消息;
c. 如何实现两阶段事务;
d. 如何实现消息回放。
下面是JMQ的一个粗略整体架构图:
图9:JMQ架构图
JMQ性能数据
测试所用机器情况:
场景1:一主一从,Master同步刷盘+同步复制到Slave
场景2:一主一从,Master异步刷盘+异步复制Slave
JMQ规模
Topic数量:1000+
接入应用数量:2500+
单日消息入队数量:500亿+
JMQ3.0
到2016年中,JMQ经过两个大版本的迭代,目前线上运行的JMQ2.0版本已经非常稳定,性能也很优秀,有人可能会问,既然都挺好了你们是不是就没事干了?只需要运维好就行了。
其实不然,随着业务的发展,服务端规模的扩充,如何构建一个多样化、自动化的系统就显得尤为重要,所以在今年下半年我们又规划了我们JMQ3.0,JMQ3.0我们有几个重要的目标:
1. 优化JMQ协议;
2. 优化复制模型;
3. 实现KAFKA协议兼容;
4. 实现全局负载均衡;
5. 实现全新的选举方案;
6. 实现资源的弹性调度。
从这些目标不难看出JMQ3.0在功能上和架构上都会有一个大的升级,所以我们打算通过两次大的迭代逐步实现上述这些目标。目前我们正在进行JMQ3.0第一版联调测试工作,预计在12月底第一个版本就会进行预发。
第一版我们基本实现了Kafka协议兼容、类Raft的主从选举、全局负载均衡、对复制进行了优化,同时实现了一个弹性调度的简单DEMO。从我们目前的一些测试数据来看,新版本在性能上又有了一定的提高。期望JMQ3.0第一期上线后我们再通过一期或者两期的迭代,能够让JMQ更上一层楼。
下图是我们JMQ3.0的一个整体架构:
图10:JMQ3.0整体架构
如果字段的最大可能长度超过255字节,那么长度值可能…
只能说作者太用心了,优秀
感谢详解
一般干个7-8年(即30岁左右),能做到年入40w-50w;有…
230721