rabbitmq实现延时队列任务
此前实现过一个基于redis和jvm的延时队列任务执行,有个弊端就是吞吐量和可靠性上得不到保障,比如系统重启队列任务丢失,需要人工的加载等等。所以此次利用rabbitmq来实现一个延时…
要开发肯定先安装MQ,MQ的安装方式可以自行百度,我这里介绍简单的docker安装,首先安装docker服务,再安装带管理界面的rabbitMQ。
1 |
|
两种延时方式
- 死信+普通交换器,依靠消息过期自动进入死信队列,然后消费死信队列的数据这个思路,但是由于这种方式不管设置队列过期时间还是消息过期时间,都不能达到单个队列消息灵活过期的目的。
比如,先放入队列10s过期消息,再放入2s过期。mq会检测头部10s是否过期,10s不过期的情况下,2s就算过去也不会跑到死信。 - 使用插件rabbitmq_delayed_message_exchange。这个可以很好的解决消息不能灵活过期的问题,但是有个弊端就是很难查看消息堆积的情况,因为他把要发送的延时消息存在本地的分布式mnesia 数据库中,其次过期时间为最大int值,超过这个值得代码判定重复过期设置。
延时插件的使用方式
- 去MQ官网下载插件 ++https://www.rabbitmq.com/community-plugins.html++(rabbitmq_delayed_message_exchange)
- 把插件放到MQ的安装目录的plugins下
- 然后执行rabbitmq-plugins enable rabbitmq_delayed_message_exchange 命令启用插件
- 然后就也可以在web页面查看新的交换器x-delayed-message(其实并不是真正意义上的,真正的只有4个)
- 然后上代码实现延时任务,配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24spring:
rabbitmq:
host: 192.168.0.245
port: 9158
username: ym_rabbit
password: ym_rabbit
listener:
simple:
acknowledge-mode: manual #手动应答
retry:
enabled: true
# 用户自定义配置
config-center:
rabbitRuleConfig:
# 系统标志
systemMark: local
# 普通消息
normalExchange: topic.normal
# 延时消息
delayExchange: topic.delay
# 普通和延时消息死信
deadExchange: topic.dead - spring中MQ的配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175/**
* RabbitMQConfig 配置
*
* @author: 李涛
* @version: 2019年05月07日 14:47
*/
@Configuration
public class RabbitMQConfig {
//----------------------------常量定义-----------------------
private static final String POINT = ".";
private static final String NORMAL = "nml";
private static final String DELAY = "dly";
private static final String QUEUE = "que";
//----------------------------交换器定义----------------------------
/**
* 普通交换器名字
*/
public static String NORMAL_EXCHANGE = "";
/**
* 死信交换器名字
*/
public static String DEAD_EXCHANGE = "";
/**
* 延时交换器名字
*/
public static String DELAY_EXCHANGE = "";
//-------------------------队列定义--------------------------
/**
* 普通队列
*/
public static String NORMAL_QUEUE = null;
/**
* 延时队列存放任务
*/
public static String DELAY_QUEUE = null;
/**
* 普通死信队列
*/
public static String DEAD_NORMAL_QUEUE = null;
/**
* 延时死信队列
*/
public static String DEAD_DELAY_QUEUE = null;
@Autowired
private ConfigCenterProperties configCenterProperties;
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory rabbitListenerContainerFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitListenerContainerFactory);
rabbitTemplate.setUsePublisherConnection(true);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@PostConstruct
public void init() {
RabbitRuleConfig rabbitRuleConfig = configCenterProperties.getRabbitRuleConfig();
NORMAL_EXCHANGE = rabbitRuleConfig.getNormalExchange() + POINT + rabbitRuleConfig.getSystemMark();
DEAD_EXCHANGE = rabbitRuleConfig.getDeadExchange() + POINT + rabbitRuleConfig.getSystemMark();
DELAY_EXCHANGE = rabbitRuleConfig.getDelayExchange() + POINT + rabbitRuleConfig.getSystemMark();
NORMAL_QUEUE = rabbitRuleConfig.getNormalExchange() + POINT + QUEUE + POINT + rabbitRuleConfig.getSystemMark();
DELAY_QUEUE = rabbitRuleConfig.getDelayExchange() + POINT + QUEUE + POINT + rabbitRuleConfig.getSystemMark();
DEAD_NORMAL_QUEUE = rabbitRuleConfig.getDeadExchange() + POINT + QUEUE + POINT + NORMAL + POINT + rabbitRuleConfig.getSystemMark();
DEAD_DELAY_QUEUE = rabbitRuleConfig.getDeadExchange() + POINT + QUEUE + POINT + DELAY + POINT + rabbitRuleConfig.getSystemMark();
}
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setChannelTransacted(true);
return factory;
}
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory2(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setMessageConverter(new SerializerMessageConverter());
factory.setChannelTransacted(true);
return factory;
}
//------------------------------交换器声明start---------------------------
@Bean
public TopicExchange normalExchange() {
return new TopicExchange(NORMAL_EXCHANGE);
}
@Bean
public TopicExchange deadExchange() {
return new TopicExchange(DEAD_EXCHANGE);
}
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "topic");
return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args);
}
//------------------------------交换器声明end---------------------------
//-------------------------------队列start---------------------------------
@Bean
public Queue deadNormalQueue() {
return new Queue(DEAD_NORMAL_QUEUE);
}
@Bean
public Queue deadDelayQueue() {
return new Queue(DEAD_DELAY_QUEUE);
}
@Bean
public Queue normalQueue() {
Map<String, Object> params = new HashMap<>();
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
params.put("x-dead-letter-routing-key", DEAD_NORMAL_QUEUE);
return new Queue(NORMAL_QUEUE, true, false, false, params);
}
@Bean
public Queue delayQueue() {
Map<String, Object> params = new HashMap<>();
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
params.put("x-dead-letter-routing-key", DEAD_DELAY_QUEUE);
return new Queue(DELAY_QUEUE, true, false, false, params);
}
//-------------------------------队列end---------------------------------
//-------------------------------绑定start---------------------------------
@Bean
public Binding bindingNormalExchange(Queue normalQueue, TopicExchange normalExchange) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_QUEUE);
}
@Bean
public Binding bindingNormalDeadExchange(Queue deadNormalQueue, TopicExchange deadExchange) {
return BindingBuilder.bind(deadNormalQueue).to(deadExchange).with(DEAD_NORMAL_QUEUE);
}
@Bean
public Binding bindingDelayExchange(Queue delayQueue, CustomExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_QUEUE).noargs();
}
@Bean
public Binding bindingDelayDeadExchange(Queue deadDelayQueue, TopicExchange deadExchange) {
return BindingBuilder.bind(deadDelayQueue).to(deadExchange).with(DEAD_DELAY_QUEUE);
}
//-----------------------------------------绑定end------------------------------------------
}
7.生产者代码开发,我这里将延时任务和普通消息分开了,所以有2个发送方法。
1 |
|
8.消费者代码
1 |
|
9.延时任务抽象类定义
1 |
|
- 使用方式
1 |
|
rabbitmq实现延时队列任务
https://kanchai.club/2022/12/10/rabbitmq实现延时队列任务/