rabbitmq实现延时队列任务

此前实现过一个基于redis和jvm的延时队列任务执行,有个弊端就是吞吐量和可靠性上得不到保障,比如系统重启队列任务丢失,需要人工的加载等等。所以此次利用rabbitmq来实现一个延时…

要开发肯定先安装MQ,MQ的安装方式可以自行百度,我这里介绍简单的docker安装,首先安装docker服务,再安装带管理界面的rabbitMQ。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#### 1.更新yum源
> yum update

#### 2.安装docker
> yum install -y docker

#### 3拉取镜像
> docker pull rabbitmq:management

#### 4启动容器
> docker run -d --name rabbitmq --privileged=true -p 9158:5672 -p 9159:15672 \
-v /root/program/rabbit/data:/var/lib/rabbitmq \
-v /root/program/rabbit/log:/var/log/rabbitmq/ \
-v /root/program/rabbit/plugins:/plugins/ \
--hostname my-rabbit -e RABBITMQ_DEFAULT_VHOST=/ -e RABBITMQ_DEFAULT_USER=the_rabbit -e RABBITMQ_DEFAULT_PASS=the_rabbit rabbitmq:management

#### 5插件目录映射,先cp出容器的插件到宿主机,那会将延迟插件放入宿主机目录下,此操作先随便起个mq容器
> docker cp rabbitmq:/plugins/ /root/program/rabbit/

#### 6进入容器方式
> docker exec -it 容器ID /bin/bash

#### 7退出容器
> exit 或者 Ctrl+p+q

#### 8向容器发送命令
> docker exec -d rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

两种延时方式

  1. 死信+普通交换器,依靠消息过期自动进入死信队列,然后消费死信队列的数据这个思路,但是由于这种方式不管设置队列过期时间还是消息过期时间,都不能达到单个队列消息灵活过期的目的。
    比如,先放入队列10s过期消息,再放入2s过期。mq会检测头部10s是否过期,10s不过期的情况下,2s就算过去也不会跑到死信。
  2. 使用插件rabbitmq_delayed_message_exchange。这个可以很好的解决消息不能灵活过期的问题,但是有个弊端就是很难查看消息堆积的情况,因为他把要发送的延时消息存在本地的分布式mnesia 数据库中,其次过期时间为最大int值,超过这个值得代码判定重复过期设置。

延时插件的使用方式

  1. 去MQ官网下载插件 ++https://www.rabbitmq.com/community-plugins.html++(rabbitmq_delayed_message_exchange)
  2. 把插件放到MQ的安装目录的plugins下
  3. 然后执行rabbitmq-plugins enable rabbitmq_delayed_message_exchange 命令启用插件
  4. 然后就也可以在web页面查看新的交换器x-delayed-message(其实并不是真正意义上的,真正的只有4个)
  5. 然后上代码实现延时任务,配置文件
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    spring:
    rabbitmq:
    host: 192.168.0.245
    port: 9158
    username: ym_rabbit
    password: ym_rabbit
    listener:
    simple:
    acknowledge-mode: manual #手动应答
    retry:
    enabled: true

    # 用户自定义配置
    config-center:
    rabbitRuleConfig:
    # 系统标志
    systemMark: local
    # 普通消息
    normalExchange: topic.normal
    # 延时消息
    delayExchange: topic.delay
    # 普通和延时消息死信
    deadExchange: topic.dead

  6. spring中MQ的配置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    /**
    * RabbitMQConfig 配置
    *
    * @author: 李涛
    * @version: 2019年05月07日 14:47
    */
    @Configuration
    public class RabbitMQConfig {


    //----------------------------常量定义-----------------------
    private static final String POINT = ".";
    private static final String NORMAL = "nml";
    private static final String DELAY = "dly";
    private static final String QUEUE = "que";

    //----------------------------交换器定义----------------------------
    /**
    * 普通交换器名字
    */
    public static String NORMAL_EXCHANGE = "";

    /**
    * 死信交换器名字
    */
    public static String DEAD_EXCHANGE = "";

    /**
    * 延时交换器名字
    */
    public static String DELAY_EXCHANGE = "";
    //-------------------------队列定义--------------------------

    /**
    * 普通队列
    */
    public static String NORMAL_QUEUE = null;

    /**
    * 延时队列存放任务
    */
    public static String DELAY_QUEUE = null;

    /**
    * 普通死信队列
    */
    public static String DEAD_NORMAL_QUEUE = null;

    /**
    * 延时死信队列
    */
    public static String DEAD_DELAY_QUEUE = null;


    @Autowired
    private ConfigCenterProperties configCenterProperties;

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory rabbitListenerContainerFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitListenerContainerFactory);
    rabbitTemplate.setUsePublisherConnection(true);
    rabbitTemplate.setChannelTransacted(true);
    return rabbitTemplate;
    }

    @PostConstruct
    public void init() {
    RabbitRuleConfig rabbitRuleConfig = configCenterProperties.getRabbitRuleConfig();
    NORMAL_EXCHANGE = rabbitRuleConfig.getNormalExchange() + POINT + rabbitRuleConfig.getSystemMark();
    DEAD_EXCHANGE = rabbitRuleConfig.getDeadExchange() + POINT + rabbitRuleConfig.getSystemMark();
    DELAY_EXCHANGE = rabbitRuleConfig.getDelayExchange() + POINT + rabbitRuleConfig.getSystemMark();
    NORMAL_QUEUE = rabbitRuleConfig.getNormalExchange() + POINT + QUEUE + POINT + rabbitRuleConfig.getSystemMark();
    DELAY_QUEUE = rabbitRuleConfig.getDelayExchange() + POINT + QUEUE + POINT + rabbitRuleConfig.getSystemMark();
    DEAD_NORMAL_QUEUE = rabbitRuleConfig.getDeadExchange() + POINT + QUEUE + POINT + NORMAL + POINT + rabbitRuleConfig.getSystemMark();
    DEAD_DELAY_QUEUE = rabbitRuleConfig.getDeadExchange() + POINT + QUEUE + POINT + DELAY + POINT + rabbitRuleConfig.getSystemMark();
    }


    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    factory.setChannelTransacted(true);
    return factory;
    }

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory2(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    factory.setMessageConverter(new SerializerMessageConverter());
    factory.setChannelTransacted(true);
    return factory;
    }

    //------------------------------交换器声明start---------------------------

    @Bean
    public TopicExchange normalExchange() {
    return new TopicExchange(NORMAL_EXCHANGE);
    }

    @Bean
    public TopicExchange deadExchange() {
    return new TopicExchange(DEAD_EXCHANGE);
    }

    @Bean
    public CustomExchange delayExchange() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "topic");
    return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args);
    }

    //------------------------------交换器声明end---------------------------

    //-------------------------------队列start---------------------------------

    @Bean
    public Queue deadNormalQueue() {
    return new Queue(DEAD_NORMAL_QUEUE);
    }

    @Bean
    public Queue deadDelayQueue() {
    return new Queue(DEAD_DELAY_QUEUE);
    }

    @Bean
    public Queue normalQueue() {
    Map<String, Object> params = new HashMap<>();
    params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
    params.put("x-dead-letter-routing-key", DEAD_NORMAL_QUEUE);
    return new Queue(NORMAL_QUEUE, true, false, false, params);
    }

    @Bean
    public Queue delayQueue() {
    Map<String, Object> params = new HashMap<>();
    params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
    params.put("x-dead-letter-routing-key", DEAD_DELAY_QUEUE);
    return new Queue(DELAY_QUEUE, true, false, false, params);
    }
    //-------------------------------队列end---------------------------------


    //-------------------------------绑定start---------------------------------

    @Bean
    public Binding bindingNormalExchange(Queue normalQueue, TopicExchange normalExchange) {
    return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_QUEUE);
    }

    @Bean
    public Binding bindingNormalDeadExchange(Queue deadNormalQueue, TopicExchange deadExchange) {
    return BindingBuilder.bind(deadNormalQueue).to(deadExchange).with(DEAD_NORMAL_QUEUE);
    }

    @Bean
    public Binding bindingDelayExchange(Queue delayQueue, CustomExchange delayExchange) {
    return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_QUEUE).noargs();
    }

    @Bean
    public Binding bindingDelayDeadExchange(Queue deadDelayQueue, TopicExchange deadExchange) {
    return BindingBuilder.bind(deadDelayQueue).to(deadExchange).with(DEAD_DELAY_QUEUE);
    }

    //-----------------------------------------绑定end------------------------------------------

    }

7.生产者代码开发,我这里将延时任务和普通消息分开了,所以有2个发送方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94

/**
* 发送消息给MQ
*
* @author: 李涛
* @version: 2019年09月19日 11:58
*/
@Service
public class IMessageSenderSV {

private static final Logger LOG = LoggerFactory.getLogger(IMessageSenderSV.class);

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 发送云信消息
*
* @param messageTask 消息内容
*/
public void sendMsg(NormalMessageTask messageTask) {
LOG.info("发送[ {} ]消息到MQ", messageTask.getMessageTypeEnum().getDescribe());
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_QUEUE, messageTask, (message) -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setMessageId(messageTask.getUuid());
messageProperties.setType(messageTask.getMessageTypeEnum().getDescribe());
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
String sendTime = DateKit.parseDateToStr(DateKit.YYYY_MM_DD_HH_MM_SS, new Date());
// 发送时间
messageProperties.setHeader("send_time", sendTime);
return message;
});
}


/**
* 发送延时任务给队列
*
* @param task 任务
*/
public void sendDelayTask(AbstractDelayedTask task) {
LOG.info("发送延时任务 [ {}:{} ] 到MQ", task.getDescribe(), task.getDelay());
rabbitTemplate.setMessageConverter(new SerializerMessageConverter());
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE, RabbitMQConfig.DELAY_QUEUE, task, (message) -> {
MessageProperties messageProperties = message.getMessageProperties();
long nextDelay = 0;
if (task.getDelay() > Integer.MAX_VALUE) {
//如果延时时间大于erlang最大数值,多次延时
messageProperties.setDelay(Integer.MAX_VALUE);
nextDelay = task.getDelay() - Integer.MAX_VALUE;
} else {
messageProperties.setDelay(task.getDelay().intValue());
}
// 下次延时的时间
messageProperties.setHeader("next_delay", nextDelay);
messageProperties.setMessageId(task.getUuid());
messageProperties.setType(task.getDescribe());
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
String sendTime = DateKit.parseDateToStr(DateKit.YYYY_MM_DD_HH_MM_SS, new Date());
// 发送时间
messageProperties.setHeader("send_time", sendTime);
String expirationTime = DateKit.parseDateToStr(DateKit.YYYY_MM_DD_HH_MM_SS, new Date(System.currentTimeMillis() + task.getDelay()));
// 过期时间
messageProperties.setHeader("expiration_time", expirationTime);
// 任务的入参
messageProperties.setHeader("params", task.getParams().toString());
return message;
});
}

/**
* 多次延时,再次发送任务
* @param task 任务
* @param nextDelay 下次延时时间
*/
public void sendAgain(Message task, final long nextDelay) {
rabbitTemplate.setMessageConverter(new SerializerMessageConverter());
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE, RabbitMQConfig.DELAY_QUEUE, task, (message) -> {
MessageProperties messageProperties = message.getMessageProperties();
long nextDelayNew = 0;
if (nextDelay > Integer.MAX_VALUE) {
//如果延时时间大于erlang最大数值,多次延时
messageProperties.setDelay(Integer.MAX_VALUE);
nextDelayNew = nextDelay - Integer.MAX_VALUE;
} else {
messageProperties.setDelay((int) nextDelay);
}
// 下次延时的时间
messageProperties.setHeader("next_delay", nextDelayNew);
return message;
});
}
}

8.消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
/**
* 消费MQ消息
*
* @author: 李涛
* @version: 2019年09月18日 10:41
*/
@Service
@EnableRabbit
public class IMessageReceiveSV {

private static final Logger LOG = LoggerFactory.getLogger(IMessageReceiveSV.class);

@Autowired
private IMessageSenderSV messageSenderSV;

@Autowired
private IYunxinUserSV yunxinUserSV;

@Autowired
private RabbitTemplate rabbitTemplate;

@Value("${config-center.rabbitRuleConfig.deadExchange}.que.nml.${config-center.rabbitRuleConfig.systemMark}")
private String normalDeadQueue;

@Value("${config-center.rabbitRuleConfig.deadExchange}.que.dly.${config-center.rabbitRuleConfig.systemMark}")
private String delayDeadQueue;

/**
* 普通消息
*/
@RabbitListener(queues = "${config-center.rabbitRuleConfig.normalExchange}.que.${config-center.rabbitRuleConfig.systemMark}", containerFactory = "rabbitListenerContainerFactory")
public void autoNormalMsg(@Payload NormalMessageTask messageTask, @Headers Map<String, Object> headers, Channel channel) throws Exception {
channel.txSelect();
boolean success = normalHandle(messageTask);
if (success) {
channel.basicAck((long) headers.get(AmqpHeaders.DELIVERY_TAG), false);
} else {
channel.basicReject((long) headers.get(AmqpHeaders.DELIVERY_TAG), false);
}
channel.txCommit();
}

/**
* 手动消费普通消息
*/
public void manualConsumptionNormal() {
rabbitTemplate.receiveAndReply(normalDeadQueue, (payload) -> {
NormalMessageTask normalMessageTask = (NormalMessageTask) payload;
boolean success = normalHandle(normalMessageTask);
if (!success) {
throw new AmqpException("普通消息消费异常");
}
return true;
});
}

/**
* 消费普通消息方法
*
* @param messageTask
*/
private boolean normalHandle(NormalMessageTask messageTask) {
try {
MessageTypeEnum messageTypeEnum = messageTask.getMessageTypeEnum();
Object msg = messageTask.getMsg();
LOG.info("消费消息 [ {} ],消息ID为[ {} ]", messageTypeEnum.getDescribe(), messageTask.getUuid());
switch (messageTypeEnum) {
case YUN_XIN: {
yunxinUserSV.syncMessages((String) msg);
}
break;
default: {
// do
LOG.info("未知消息:{}", (String) msg);
}
}
} catch (Exception e) {
if (e instanceof BusinessException) {
LOG.info(e.getMessage());
return true;
} else {
LOG.error("消费异常:{}", ExceptionUtil.getExceptionMessage(e));
return false;
}
}
return true;
}


/**
* 延时消息,执行策略
* <p>
* 能收到说明已经到时间了
*/
@RabbitListener(queues = "${config-center.rabbitRuleConfig.delayExchange}.que.${config-center.rabbitRuleConfig.systemMark}", containerFactory = "rabbitListenerContainerFactory2")
public void autoDelayMsg(@Payload Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
channel.txSelect();
boolean success = delayHandle(message);
if (success) {
channel.basicAck((long) headers.get(AmqpHeaders.DELIVERY_TAG), false);
} else {
channel.basicReject((long) headers.get(AmqpHeaders.DELIVERY_TAG), false);
}
channel.txCommit();
}

/**
* 消费延时消息方法
*
* @param message
*/
private boolean delayHandle(Message message) {
MessageProperties messageProperties = message.getMessageProperties();
Map<String, Object> headers = messageProperties.getHeaders();
try {
// 判定是否要多次延时
long nextDelay = (long) headers.get("next_delay");
if (nextDelay > 0) {
messageSenderSV.sendAgain(message, nextDelay);
return true;
}
byte[] body = message.getBody();
if (body != null && body.length > 0) {
//判定为一个有效消息,进行执行
try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body));) {
AbstractDelayedTask abstractDelayedTask = (AbstractDelayedTask) ois.readObject();
LOG.info("执行延时任务 [ {} ],消息ID为[ {} ],参数为:{}", abstractDelayedTask.getDescribe(), abstractDelayedTask.getUuid(), JSONObject.toJSONString(headers));
abstractDelayedTask.excute();
}
}
return true;
} catch (Throwable e) {
if (e instanceof BusinessException) {
LOG.info(e.getMessage());
return true;
} else {
LOG.error("消费异常:{}", ExceptionUtil.getExceptionMessage(e));
return false;
}
}
}

/**
* 手动消费延时消息
*/
public void manualConsumptionDelay() {
rabbitTemplate.receiveAndReply(delayDeadQueue, (payload) -> {
AbstractDelayedTask abstractDelayedTask = (AbstractDelayedTask) payload;
try {
abstractDelayedTask.excute();
} catch (Exception e) {
LOG.error(ExceptionUtil.getExceptionMessage(e));
throw new AmqpException("延时消息异常");
}
return true;
});
}
}

9.延时任务抽象类定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* 延时队列Task
*
* @author 李涛
* @version 创建时间:2018年6月16日 下午3:34:43
*/
@Data
public abstract class AbstractDelayedTask implements Serializable {

protected static final Logger LOG = LoggerFactory.getLogger(AbstractDelayedTask.class);

/**
* 任务唯一性标志
*/
private String uuid = UUID.uuid();

/**
* 任务描述
*/
private String describe;

/**
* 多久后执行,单位毫秒
*/
private Long delay;

/**
* 方法需要执行的参数
*/
private JSONObject params;

public AbstractDelayedTask(String describe, long delay, JSONObject params) {
this.describe = describe;
this.delay = delay;
this.params = params;
}

/**
* 执行任务
*/
public void excute() throws Exception {
LOG.info("执行延时任务开始===========》{}", describe);
this.run();
LOG.info("执行延时任务结束===========》{}", describe);
}

public abstract void run() throws Exception;

}

  1. 使用方式
1
2
3
4
5
//15分钟未支付取消订单操作
JSONObject params = new JSONObject();
params.put("id","订单ID");
UnPayCancelOrderTask unPayCancelOrderTask = new UnPayCancelOrderTask("下单后不支付自动取消订单", TimeUnit.MINUTES.toMillis(15), params);
messageSenderSV.sendDelayTask(unPayCancelOrderTask);

rabbitmq实现延时队列任务
https://kanchai.club/2022/12/10/rabbitmq实现延时队列任务/
作者
625
发布于
2022年12月10日
许可协议