在spring中加入一个守护线程+延时队列来处理一些延时任务.比如用户注册后5分钟后发送短信.等等

延时任务Bean的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package com.ym.common.utils;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
* 延时队列Task
*
* @author 李涛
* @version 创建时间:2018年6月16日 下午3:34:43
*/
public class DelayedTask<T extends Runnable> implements Delayed {
/**
* 任务名称
*/
private final String name;
/**
* 到期时间
*/
private final long time;

/**
* 问题对象
*/
private final T task;

private static final AtomicLong atomic = new AtomicLong(0);

private final long n;

public DelayedTask(long timeout, T t, String name) {
this.time = System.nanoTime() + timeout;
this.task = t;
this.name = name;
this.n = atomic.getAndIncrement();
}

/**
* 返回与此对象相关的剩余延迟时间,以给定的时间单位表示
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
}

@Override
public int compareTo(Delayed other) {
// TODO Auto-generated method stub
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof DelayedTask) {
DelayedTask<Runnable> x = (DelayedTask) other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (getN() < x.getN())
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}

public T getTask() {
return this.task;
}

@Override
public int hashCode() {
return task.hashCode();
}

@Override
public boolean equals(Object object) {
if (object instanceof DelayedTask) {
return object.hashCode() == hashCode() ? true : false;
}
return false;
}

public String getName() {
return name;
}

public long getN() {
return n;
}

}

Spring容器Bean的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package com.ym.web.bean;

import com.ym.common.utils.DelayedTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* 延时队列任务管理
*
* @author 李涛
* @version 创建时间:2018年6月16日 下午3:35:39
*/
@Component
public class TaskQueueBean {

private static final Logger LOG = LoggerFactory.getLogger(TaskQueueBean.class);

private static volatile boolean started = false;

private TaskQueueBean() {
}

private static class LazyHolder {
private static TaskQueueBean taskQueueDaemonThread = new TaskQueueBean();
}

public static TaskQueueBean getInstance() {
return LazyHolder.taskQueueDaemonThread;
}

/**
* 执行任务的线程
*/
private ExecutorService executor = null;

/**
* 创建一个最初为空的新 DelayQueue
*/
private DelayQueue<DelayedTask<Runnable>> queue = null;

/**
* 守护线程
*/
private Thread daemonThread;

/**
* 初始化守护线程
*/
@PostConstruct
public synchronized void start() {
// 1.初始化线程池
if (!started) {
started = true;
executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
queue = new DelayQueue<>();
// 2.判断是否启动
if (daemonThread != null && daemonThread.isInterrupted()) {
daemonThread.start();
return;
}
}
daemonThread = new Thread() {
public void run() {
try {
execute();
} catch (InterruptedException e) {
daemonThread.interrupt();
}
}
};
daemonThread.setDaemon(true);
daemonThread.setName("DelayedTask");
daemonThread.start();
LOG.info("~~~~~~~~~~~~~~~~~~~~延时任务开启~~~~~~~~~~~~~~~~~~~~~~~~~");
}

private void execute() throws InterruptedException {
LOG.info("[ task start {} ]:", System.currentTimeMillis());
while (started) {
// 从延迟队列中取值,如果没有对象过期则队列一直等待,
DelayedTask<Runnable> t1 = queue.take();
if (t1 != null) {
// 修改问题的状态
Runnable task = t1.getTask();
if (task == null) {
continue;
}
executor.execute(task);
LOG.info("[ {} task {} execute ] ", t1.getN(), t1.getName());
}
}
}

/**
* 添加任务, time 延迟时间 task 任务 用户为问题设置延迟时间
*/
public void put(long time, Runnable task, String taskName) {
if (!started) {
throw new UnsupportedOperationException("请先启动taskQueneBean!");
}
// 转换成ns
long nanoTime = TimeUnit.NANOSECONDS.convert(time, TimeUnit.MILLISECONDS);
// 创建一个任务
DelayedTask<Runnable> k = new DelayedTask<Runnable>(nanoTime, task, taskName);
// 将任务放在延迟的队列中
queue.put(k);
LOG.info("新任务:{}加入队列,当前队列任务数量:{}", taskName, queue.size());
}

/**
* 结束
*
* @param task
*/
public boolean endTask(DelayedTask<Runnable> task) {
if (!started) {
throw new UnsupportedOperationException("请先启动taskQueneBean!");
}
return queue.remove(task);
}

/**
* 手动关闭任务
*/
public synchronized void stop() {
if (started) {
LOG.info("shutdown TaskQueueBean");
started = false;
daemonThread.interrupt();
executor.shutdown();
daemonThread = null;
queue = null;
}
}
}

`