基于Delayed实现一个定时延时任务

在spring中加入一个守护线程+延时队列来处理一些延时任务.比如用户注册后5分钟后发送短信.等等

延时任务Bean的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package com.ym.common.utils;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
* 延时队列Task
*
* @author 李涛
* @version 创建时间:2018年6月16日 下午3:34:43
*/
public class DelayedTask<T extends Runnable> implements Delayed {
/**
* 任务名称
*/
private final String name;
/**
* 到期时间
*/
private final long time;

/**
* 问题对象
*/
private final T task;

private static final AtomicLong atomic = new AtomicLong(0);

private final long n;

public DelayedTask(long timeout, T t, String name) {
this.time = System.nanoTime() + timeout;
this.task = t;
this.name = name;
this.n = atomic.getAndIncrement();
}

/**
* 返回与此对象相关的剩余延迟时间,以给定的时间单位表示
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
}

@Override
public int compareTo(Delayed other) {
// TODO Auto-generated method stub
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof DelayedTask) {
DelayedTask<Runnable> x = (DelayedTask) other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (getN() < x.getN())
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}

public T getTask() {
return this.task;
}

@Override
public int hashCode() {
return task.hashCode();
}

@Override
public boolean equals(Object object) {
if (object instanceof DelayedTask) {
return object.hashCode() == hashCode() ? true : false;
}
return false;
}

public String getName() {
return name;
}

public long getN() {
return n;
}

}

Spring容器Bean的定义

package com.ym.web.bean;

import com.ym.common.utils.DelayedTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 延时队列任务管理
 *
 * @author 李涛
 * @version 创建时间:2018年6月16日 下午3:35:39
 */
@Component
public class TaskQueueBean {

    private static final Logger LOG = LoggerFactory.getLogger(TaskQueueBean.class);

    private static volatile boolean started = false;

    private TaskQueueBean() {
    }

    private static class LazyHolder {
        private static TaskQueueBean taskQueueDaemonThread = new TaskQueueBean();
    }

    public static TaskQueueBean getInstance() {
        return LazyHolder.taskQueueDaemonThread;
    }

    /**
     * 执行任务的线程
     */
    private ExecutorService executor = null;

    /**
     * 创建一个最初为空的新 DelayQueue
     */
    private DelayQueue<DelayedTask<Runnable>> queue = null;

    /**
     * 守护线程
     */
    private Thread daemonThread;

    /**
     * 初始化守护线程
     */
    @PostConstruct
    public synchronized void start() {
        // 1.初始化线程池
        if (!started) {
            started = true;
            executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
            queue = new DelayQueue<>();
            // 2.判断是否启动
            if (daemonThread != null && daemonThread.isInterrupted()) {
                daemonThread.start();
                return;
            }
        }
        daemonThread = new Thread() {
            public void run() {
                try {
                    execute();
                } catch (InterruptedException e) {
                    daemonThread.interrupt();
                }
            }
        };
        daemonThread.setDaemon(true);
        daemonThread.setName("DelayedTask");
        daemonThread.start();
        LOG.info("~~~~~~~~~~~~~~~~~~~~延时任务开启~~~~~~~~~~~~~~~~~~~~~~~~~");
    }

    private void execute() throws InterruptedException {
        LOG.info("[ task start {} ]:", System.currentTimeMillis());
        while (started) {
            // 从延迟队列中取值,如果没有对象过期则队列一直等待,
            DelayedTask<Runnable> t1 = queue.take();
            if (t1 != null) {
                // 修改问题的状态
                Runnable task = t1.getTask();
                if (task == null) {
                    continue;
                }
                executor.execute(task);
                LOG.info("[ {}  task {} execute  ] ", t1.getN(), t1.getName());
            }
        }
    }

    /**
     * 添加任务, time 延迟时间 task 任务 用户为问题设置延迟时间
     */
    public void put(long time, Runnable task, String taskName) {
        if (!started) {
            throw new UnsupportedOperationException("请先启动taskQueneBean!");
        }
        // 转换成ns
        long nanoTime = TimeUnit.NANOSECONDS.convert(time, TimeUnit.MILLISECONDS);
        // 创建一个任务
        DelayedTask<Runnable> k = new DelayedTask<Runnable>(nanoTime, task, taskName);
        // 将任务放在延迟的队列中
        queue.put(k);
        LOG.info("新任务:{}加入队列,当前队列任务数量:{}", taskName, queue.size());
    }

    /**
     * 结束
     *
     * @param task
     */
    public boolean endTask(DelayedTask<Runnable> task) {
        if (!started) {
            throw new UnsupportedOperationException("请先启动taskQueneBean!");
        }
        return queue.remove(task);
    }

    /**
     * 手动关闭任务
     */
    public synchronized void stop() {
        if (started) {
            LOG.info("shutdown TaskQueueBean");
            started = false;
            daemonThread.interrupt();
            executor.shutdown();
            daemonThread = null;
            queue = null;
        }
    }
}

基于Delayed实现一个定时延时任务
https://kanchai.club/2022/12/10/实现一个延时任务/
作者
625
发布于
2022年12月10日
许可协议