手动实现单服务的cloud bus功能

场景是在cloud单服务下,不引入cloud-bus-mq 组件,实现动态刷新配置,主要核心的刷新方法是cloud已经提供的RefreshEndpoint类。

  • import异步刷新组件编写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package com.yamibuy.ec.search.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.endpoint.RefreshEndpoint;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import java.util.Map;

@RestController
@EnableBinding(StreamListenConfigRegistrar.ConfigRefreshChanel.class)
@Slf4j
public class StreamListenConfigRegistrar {

@Autowired
private BindingServiceProperties bindingServiceProperties;

@Autowired
private ConfigRefreshChanel configRefreshChanel;

/**
* 事件发布者
*/
@Autowired
private RefreshEndpoint refreshEndpoint;

/**
* 配置管道绑定目的地,自动化,约定配置
*/
@PostConstruct
public void initConfig(){
log.info("配置刷新消息声明");
BindingProperties bindingProperties = new BindingProperties();
bindingProperties.setDestination("searchConfigExchange");
Map<String, BindingProperties> bindings = bindingServiceProperties.getBindings();
bindings.put("searchConfigInput",bindingProperties);
bindings.put("searchConfigOutput",bindingProperties);

}

/**
* 刷新配置接口
* @return
*/
@GetMapping("/config/refresh")
public ResponseEntity<String> publish() {
log.info("刷新配置广播");
boolean ok = configRefreshChanel.searchConfigOutput().send(MessageBuilder.withPayload("refresh config").build());
return ResponseEntity.ok("config is refresh "+ok+"!");
}

/**
* 消息订阅刷新配置
* @param msg
*/
@StreamListener(ConfigRefreshChanel.IN)
public void subscribe(String msg) {
log.info("配置刷新消息:{}",msg);
refreshEndpoint.refresh();
}


/**
* 通道声明
*/
public interface ConfigRefreshChanel {

String IN = "searchConfigInput";

String OUT = "searchConfigOutput";

@Input(IN)
SubscribableChannel searchConfigInput();

@Output(OUT)
MessageChannel searchConfigOutput();
}

}

  • 注解类添加

    1
    2
    3
    4
    5
    6
    7
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.TYPE})
    @Documented
    @Import({StreamListenConfigRegistrar.class})
    public @interface EnableConfigRefresh {
    }

  • 启动类添加注解,开启单服务配置刷新

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @SpringBootApplication
    @EnableConfigRefresh
    public class SearchApplication {

    public static void main(String[] args) {
    SpringApplication.run(SearchApplication.class, args);
    }

    }

手动实现单服务的cloud bus功能
https://kanchai.club/2022/12/10/手动实现单服务的cloud bus功能/
作者
625
发布于
2022年12月10日
许可协议