基于Delayed实现一个定时延时任务
在spring中加入一个守护线程+延时队列来处理一些延时任务.比如用户注册后5分钟后发送短信.等等
延时任务Bean的实现
1 |
|
Spring容器Bean的定义
package com.ym.web.bean;
import com.ym.common.utils.DelayedTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 延时队列任务管理
*
* @author 李涛
* @version 创建时间:2018年6月16日 下午3:35:39
*/
@Component
public class TaskQueueBean {
private static final Logger LOG = LoggerFactory.getLogger(TaskQueueBean.class);
private static volatile boolean started = false;
private TaskQueueBean() {
}
private static class LazyHolder {
private static TaskQueueBean taskQueueDaemonThread = new TaskQueueBean();
}
public static TaskQueueBean getInstance() {
return LazyHolder.taskQueueDaemonThread;
}
/**
* 执行任务的线程
*/
private ExecutorService executor = null;
/**
* 创建一个最初为空的新 DelayQueue
*/
private DelayQueue<DelayedTask<Runnable>> queue = null;
/**
* 守护线程
*/
private Thread daemonThread;
/**
* 初始化守护线程
*/
@PostConstruct
public synchronized void start() {
// 1.初始化线程池
if (!started) {
started = true;
executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
queue = new DelayQueue<>();
// 2.判断是否启动
if (daemonThread != null && daemonThread.isInterrupted()) {
daemonThread.start();
return;
}
}
daemonThread = new Thread() {
public void run() {
try {
execute();
} catch (InterruptedException e) {
daemonThread.interrupt();
}
}
};
daemonThread.setDaemon(true);
daemonThread.setName("DelayedTask");
daemonThread.start();
LOG.info("~~~~~~~~~~~~~~~~~~~~延时任务开启~~~~~~~~~~~~~~~~~~~~~~~~~");
}
private void execute() throws InterruptedException {
LOG.info("[ task start {} ]:", System.currentTimeMillis());
while (started) {
// 从延迟队列中取值,如果没有对象过期则队列一直等待,
DelayedTask<Runnable> t1 = queue.take();
if (t1 != null) {
// 修改问题的状态
Runnable task = t1.getTask();
if (task == null) {
continue;
}
executor.execute(task);
LOG.info("[ {} task {} execute ] ", t1.getN(), t1.getName());
}
}
}
/**
* 添加任务, time 延迟时间 task 任务 用户为问题设置延迟时间
*/
public void put(long time, Runnable task, String taskName) {
if (!started) {
throw new UnsupportedOperationException("请先启动taskQueneBean!");
}
// 转换成ns
long nanoTime = TimeUnit.NANOSECONDS.convert(time, TimeUnit.MILLISECONDS);
// 创建一个任务
DelayedTask<Runnable> k = new DelayedTask<Runnable>(nanoTime, task, taskName);
// 将任务放在延迟的队列中
queue.put(k);
LOG.info("新任务:{}加入队列,当前队列任务数量:{}", taskName, queue.size());
}
/**
* 结束
*
* @param task
*/
public boolean endTask(DelayedTask<Runnable> task) {
if (!started) {
throw new UnsupportedOperationException("请先启动taskQueneBean!");
}
return queue.remove(task);
}
/**
* 手动关闭任务
*/
public synchronized void stop() {
if (started) {
LOG.info("shutdown TaskQueueBean");
started = false;
daemonThread.interrupt();
executor.shutdown();
daemonThread = null;
queue = null;
}
}
}
基于Delayed实现一个定时延时任务
https://kanchai.club/2022/12/10/实现一个延时任务/