用定时线程池背上故障了!公司损失了几千万……

爱敲代码的小黄 2024-06-04 11:18:59

 

一、背景

 

大家好呀,上周我们公司由于定时线程池使用不当出了一个故障,几千万的单子可能没了。

 

 

给兄弟们分享分享这个坑,希望兄弟们以后别踩!

 

业务中大量的使用定时线程池(ScheduledExecutorService)执行任务,有时候会忽略掉 Try/Catch 的异常判断。

 

当任务执行报错时,会导致整个定时线程池挂掉,影响业务的正常需求。

 

二、问题

 

我们来模仿一个生产的例子:

 

  • 合作方修改频率低且合作方允许最终一致性

  • 我们有一个定时任务每隔 60 秒去 MySQL 拉取全量的 合作方 数据放至 合作方缓存(本地缓存) 中。

  • 当客户请求时,我们去缓存中拿取合作方即可。

 

 

这样的生产例子应该存在于绝大数公司,代码如下:

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
public class Demo {    // 创建定时线程池    private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    private List partnerCache = new ArrayList<>();
    @PostConstruct    public void init() {        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {            @Override            public void run() {                // 启动时每隔60秒执行一次数据库的刷新                // 将数据缓存至本地                loadPartner();            }        }, 3, 60, TimeUnit.SECONDS);    }
    public void loadPartner() {        // 查询数据库当前最新合作方数据        List partnerList = queryPartners();        // 合作方数据放至缓存        partnerCache.clear();        partnerCache.addAll(partnerList);    }

    public List queryPartners() {        // 数据库挂了!        throw new RuntimeException();    }
}

 

运行上述样例,我们会发现程序不停止,输出一遍 Load start!,一直在运行,但后续不输出 Load start!

 

这个时候我们可以确认:**异常确实导致当前任务不再执行**

 

1、为什么任务报错会影响定时线程池?

 

2、定时线程池是真的挂掉了嘛?

 

3、定时线程池内部是如何执行的?

 

跟着这三个问题,我们一起来看一看 ScheduledExecutorService 的原理介绍。

 

三、原理剖析

 

对于 ScheduledExecutorService 来说,本质上是 延时队列 + 线程池

 

 
1、延时队列介绍

 

DelayQueue 是一个无界的 BlockingQueue,用于放置实现了Delayed接口的对象,只能在到期时才能从队列中取走。

 

这种队列是有序的,即队头对象的延迟到期时间最长。

 

我们看一下 延时队列 里对象的属性:

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
class MyDelayedTask implements Delayed{    // 当前任务创建时间    private long start = System.currentTimeMillis();    // 延时时间    private long time ;    // 初始化    public MyDelayedTask(long time) {        this.time = time;    }        /**     * 需要实现的接口,获得延迟时间(用过期时间-当前时间)     */    @Override    public long getDelay(TimeUnit unit) {        return unit.convert((start+time) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);    }    /**     * 用于延迟队列内部比较排序(当前时间的延迟时间 - 比较对象的延迟时间)     */    @Override    public int compareTo(Delayed o) {        MyDelayedTask o1 = (MyDelayedTask) o;        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));    }}

 

所以,延时队列的实现原理也很简单:

 

  • 生产端:投递消息时增加时间戳(当前时间+延时时间)

  • 消费端:用当前时间与时间戳进行比较,若小于则消费,反之则循环等待

 

 
2、线程池的原理介绍

 

 

  • 当前的线程池个数低于核心线程数,直接添加核心线程即可。

  • 当前的线程池个数大于核心线程数,将任务添加至阻塞队列中。

  • 如果添加阻塞队列失败,则需要添加非核心线程数处理任务。

  • 如果添加非核心线程数失败(满了),执行拒绝策略。

 

 
3、定时线程的原理

 

我们从定时线程池的创建看:

 

scheduledExecutorService.scheduleAtFixedRate(myTask, 3L, 1L, TimeUnit.SECONDS);

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {    // 初始化我们的任务    // triggerTime:延时的实现    ScheduledFutureTask sft = new ScheduledFutureTask(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));    RunnableScheduledFuture t = decorateTask(command, sft);    sft.outerTask = t;    delayedExecute(t);    return t;}private void delayedExecute(RunnableScheduledFuture task) {    // 将当前任务丢进延时队列    super.getQueue().add(task);    // 创建核心线程并启动     ensurePrestart();}
// 时间轮算法private long triggerTime(long delay, TimeUnit unit) {    return now() + delay;}

 

从这里我们可以得到结论:定时线程池通过延时队列来达到定时的目的

 

有一个问题:我们仅仅向 Queue 里面放了一个任务,他是怎么保证执行多次的呢?

 

带着这个问题,我们看一下他拉取任务启动的代码:

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
for (;;) {    // 从延时队列中获取任务    Runnable r = workQueue.take();}public RunnableScheduledFuture take(){    for (;;) {        // 获取队列第一个任务        RunnableScheduledFuture first = queue[0];                // 【重点】如果当前队列任务为空,则等待        if (first == null){            available.await();        }                                // 获取当前任务的时间        long delay = first.getDelay(NANOSECONDS);                if (delay <= 0){            // 弹出当前任务            return finishPoll(first);        }                                  }}// 时间戳减去当前时间public long getDelay(TimeUnit unit) {    return unit.convert(time - now(), NANOSECONDS);}

 

当拿到任务(ScheduledFutureTask)之后,会执行任务:task.run()

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
public void run() {   // 执行当前的任务   if (ScheduledFutureTask.super.runAndReset()) {        setNextRunTime();        reExecutePeriodic(outerTask);    }}

protected boolean runAndReset() {    if (state != NEW){        return false;    }    int s = state;    try {        Callable c = callable;        if (c != null && s == NEW) {            try {                // 执行任务                c.call();                 // 【重点!!!】如果任务正常执行成功的话,这里会将ran置为true                // 如果你的任务有问题,会被下面直接捕捉到,不会将此处的ran置为true                ran = true;            } catch (Throwable ex) {                // 出现异常会将state置为EXCEPTIONAL                // 标记当前任务执行失败并将异常赋值到结果                setException(ex);            }finally {                 s = state;            }        }    }    // ran:当前任务是否执行成功    // s:当前任务状态    // ran为false:当前任务执行失败    // s == NEW = false:当前任务状态出现异常    return ran && s == NEW;}

 

如果我们的 runAndReset 返回 false 的话,那么进不去 setNextRunTime 该方法:

 

  •  
  •  
  •  
  •  
  •  
  •  
if (ScheduledFutureTask.super.runAndReset()) {    // 修改当前任务的Time    setNextRunTime();    // 将任务重新丢进队列    reExecutePeriodic(outerTask);}

 

最终,任务没有办法被丢进队列,我们的线程无法拿到任务执行,一直在等待。

 

四、结论

 

通过上面的分析,我们回头看一下开篇的三个问题:

 

 
1、为什么任务报错会影响定时线程池?

 

任务报错不会影响线程池,只是线程池将当前任务给丢失,没有继续放到队列中。

 

 
2、定时线程池是真的挂掉了嘛?

 

定时线程池没有挂,挂的只是报错的任务。

 

 
3、定时线程池内部是如何执行的?

 

  • 线程池 + 延时队列

 

所以,通过上述的讲解,我们应该认识到:定时任务一定要加Try Catch

 

不然一旦发生异常,你就会和作者一样,背故障让公司损失几千万,血的教训!

 

五、总结

 

鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留地分享经验,才是对抗互联网寒冬的最佳选择。

 

其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。

 

作者丨爱敲代码的小黄
来源丨公众号: 爱敲代码的小黄(ID:gh_e5eb1d504c98
dbaplus社群欢迎广大技术人员投稿,投稿邮箱:editor@dbaplus.cn
最新评论
访客 2023年08月20日

230721

访客 2023年08月16日

1、导入Mongo Monitor监控工具表结构(mongo_monitor…

访客 2023年08月04日

上面提到: 在问题描述的架构图中我们可以看到,Click…

访客 2023年07月19日

PMM不香吗?

访客 2023年06月20日

如今看都很棒

活动预告